diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index f68ae3ed7..1a0560125 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -12,6 +12,7 @@ #include "db/db_test_util.h" #include "options/options_helper.h" #include "port/stack_trace.h" +#include "rocksdb/filter_policy.h" #include "rocksdb/flush_block_policy.h" #include "rocksdb/merge_operator.h" #include "rocksdb/perf_context.h" @@ -2066,6 +2067,220 @@ TEST_P(DBMultiGetTestWithParam, MultiGetBatchedValueSizeMultiLevelMerge) { INSTANTIATE_TEST_CASE_P(DBMultiGetTestWithParam, DBMultiGetTestWithParam, testing::Combine(testing::Bool(), testing::Bool())); +#if USE_COROUTINES +class DBMultiGetAsyncIOTest : public DBBasicTest { + public: + DBMultiGetAsyncIOTest() + : DBBasicTest(), statistics_(ROCKSDB_NAMESPACE::CreateDBStatistics()) { + BlockBasedTableOptions bbto; + bbto.filter_policy.reset(NewBloomFilterPolicy(10)); + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.statistics = statistics_; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + Reopen(options); + int num_keys = 0; + + // Put all keys in the bottommost level, and overwrite some keys + // in L0 and L1 + for (int i = 0; i < 128; ++i) { + EXPECT_OK(Put(Key(i), "val_l2_" + std::to_string(i))); + num_keys++; + if (num_keys == 8) { + EXPECT_OK(Flush()); + num_keys = 0; + } + } + if (num_keys > 0) { + EXPECT_OK(Flush()); + num_keys = 0; + } + MoveFilesToLevel(2); + + for (int i = 0; i < 128; i += 3) { + EXPECT_OK(Put(Key(i), "val_l1_" + std::to_string(i))); + num_keys++; + if (num_keys == 8) { + EXPECT_OK(Flush()); + num_keys = 0; + } + } + if (num_keys > 0) { + EXPECT_OK(Flush()); + num_keys = 0; + } + MoveFilesToLevel(1); + + for (int i = 0; i < 128; i += 5) { + EXPECT_OK(Put(Key(i), "val_l0_" + std::to_string(i))); + num_keys++; + if (num_keys == 8) { + EXPECT_OK(Flush()); + num_keys = 0; + } + } + if (num_keys > 0) { + EXPECT_OK(Flush()); + num_keys = 0; + } + EXPECT_EQ(0, num_keys); + } + + const std::shared_ptr& statistics() { return statistics_; } + + private: + std::shared_ptr statistics_; +}; + +TEST_F(DBMultiGetAsyncIOTest, GetFromL0) { + std::vector key_strs; + std::vector keys; + std::vector values; + std::vector statuses; + + // All 3 keys in L0. The L0 files should be read serially. + key_strs.push_back(Key(0)); + key_strs.push_back(Key(40)); + key_strs.push_back(Key(80)); + keys.push_back(key_strs[0]); + keys.push_back(key_strs[1]); + keys.push_back(key_strs[2]); + values.resize(keys.size()); + statuses.resize(keys.size()); + + ReadOptions ro; + ro.async_io = true; + dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), + keys.data(), values.data(), statuses.data()); + ASSERT_EQ(values.size(), 3); + ASSERT_EQ(values[0], "val_l0_" + std::to_string(0)); + ASSERT_EQ(values[1], "val_l0_" + std::to_string(40)); + ASSERT_EQ(values[2], "val_l0_" + std::to_string(80)); + + HistogramData multiget_io_batch_size; + + statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size); + + // No async IO in this case since we don't do parallel lookup in L0 + ASSERT_EQ(multiget_io_batch_size.count, 0); + ASSERT_EQ(multiget_io_batch_size.max, 0); +} + +TEST_F(DBMultiGetAsyncIOTest, GetFromL1) { + std::vector key_strs; + std::vector keys; + std::vector values; + std::vector statuses; + + key_strs.push_back(Key(33)); + key_strs.push_back(Key(54)); + key_strs.push_back(Key(102)); + keys.push_back(key_strs[0]); + keys.push_back(key_strs[1]); + keys.push_back(key_strs[2]); + values.resize(keys.size()); + statuses.resize(keys.size()); + + ReadOptions ro; + ro.async_io = true; + dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), + keys.data(), values.data(), statuses.data()); + ASSERT_EQ(values.size(), 3); + ASSERT_EQ(statuses[0], Status::OK()); + ASSERT_EQ(statuses[1], Status::OK()); + ASSERT_EQ(statuses[2], Status::OK()); + ASSERT_EQ(values[0], "val_l1_" + std::to_string(33)); + ASSERT_EQ(values[1], "val_l1_" + std::to_string(54)); + ASSERT_EQ(values[2], "val_l1_" + std::to_string(102)); + + HistogramData multiget_io_batch_size; + + statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size); + + // A batch of 3 async IOs is expected, one for each overlapping file in L1 + ASSERT_EQ(multiget_io_batch_size.count, 1); + ASSERT_EQ(multiget_io_batch_size.max, 3); +} + +TEST_F(DBMultiGetAsyncIOTest, LastKeyInFile) { + std::vector key_strs; + std::vector keys; + std::vector values; + std::vector statuses; + + // 24 is the last key in the first L1 file + key_strs.push_back(Key(21)); + key_strs.push_back(Key(54)); + key_strs.push_back(Key(102)); + keys.push_back(key_strs[0]); + keys.push_back(key_strs[1]); + keys.push_back(key_strs[2]); + values.resize(keys.size()); + statuses.resize(keys.size()); + + ReadOptions ro; + ro.async_io = true; + dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), + keys.data(), values.data(), statuses.data()); + ASSERT_EQ(values.size(), 3); + ASSERT_EQ(statuses[0], Status::OK()); + ASSERT_EQ(statuses[1], Status::OK()); + ASSERT_EQ(statuses[2], Status::OK()); + ASSERT_EQ(values[0], "val_l1_" + std::to_string(21)); + ASSERT_EQ(values[1], "val_l1_" + std::to_string(54)); + ASSERT_EQ(values[2], "val_l1_" + std::to_string(102)); + + HistogramData multiget_io_batch_size; + + statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size); + + // Since the first MultiGet key is the last key in a file, the MultiGet is + // expected to lookup in that file first, before moving on to other files. + // So the first file lookup will issue one async read, and the next lookup + // will lookup 2 files in parallel and issue 2 async reads + ASSERT_EQ(multiget_io_batch_size.count, 2); + ASSERT_EQ(multiget_io_batch_size.max, 2); +} + +TEST_F(DBMultiGetAsyncIOTest, GetFromL1AndL2) { + std::vector key_strs; + std::vector keys; + std::vector values; + std::vector statuses; + + // 33 and 102 are in L1, and 56 is in L2 + key_strs.push_back(Key(33)); + key_strs.push_back(Key(56)); + key_strs.push_back(Key(102)); + keys.push_back(key_strs[0]); + keys.push_back(key_strs[1]); + keys.push_back(key_strs[2]); + values.resize(keys.size()); + statuses.resize(keys.size()); + + ReadOptions ro; + ro.async_io = true; + dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), + keys.data(), values.data(), statuses.data()); + ASSERT_EQ(values.size(), 3); + ASSERT_EQ(statuses[0], Status::OK()); + ASSERT_EQ(statuses[1], Status::OK()); + ASSERT_EQ(statuses[2], Status::OK()); + ASSERT_EQ(values[0], "val_l1_" + std::to_string(33)); + ASSERT_EQ(values[1], "val_l2_" + std::to_string(56)); + ASSERT_EQ(values[2], "val_l1_" + std::to_string(102)); + + HistogramData multiget_io_batch_size; + + statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size); + + // There is only one MultiGet key in the bottommost level - 56. Thus + // the bottommost level will not use async IO. + ASSERT_EQ(multiget_io_batch_size.count, 1); + ASSERT_EQ(multiget_io_batch_size.max, 2); +} +#endif // USE_COROUTINES + TEST_F(DBBasicTest, MultiGetStats) { Options options; options.create_if_missing = true; @@ -3435,6 +3650,11 @@ class DeadlineRandomAccessFile : public FSRandomAccessFileOwnerWrapper { IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs, const IOOptions& options, IODebugContext* dbg) override; + IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts, + std::function cb, + void* cb_arg, void** io_handle, IOHandleDeleter* del_fn, + IODebugContext* dbg) override; + private: DeadlineFS& fs_; std::unique_ptr file_; @@ -3575,6 +3795,26 @@ IOStatus DeadlineRandomAccessFile::Read(uint64_t offset, size_t len, return s; } +IOStatus DeadlineRandomAccessFile::ReadAsync( + FSReadRequest& req, const IOOptions& opts, + std::function cb, void* cb_arg, + void** io_handle, IOHandleDeleter* del_fn, IODebugContext* dbg) { + const std::chrono::microseconds deadline = fs_.GetDeadline(); + const std::chrono::microseconds io_timeout = fs_.GetIOTimeout(); + IOStatus s; + if (deadline.count() || io_timeout.count()) { + fs_.AssertDeadline(deadline, io_timeout, opts); + } + if (s.ok()) { + s = FSRandomAccessFileWrapper::ReadAsync(req, opts, cb, cb_arg, io_handle, + del_fn, dbg); + } + if (s.ok()) { + s = fs_.ShouldDelay(opts); + } + return s; +} + IOStatus DeadlineRandomAccessFile::MultiRead(FSReadRequest* reqs, size_t num_reqs, const IOOptions& options, @@ -3596,7 +3836,8 @@ IOStatus DeadlineRandomAccessFile::MultiRead(FSReadRequest* reqs, // A test class for intercepting random reads and injecting artificial // delays. Used for testing the MultiGet deadline feature -class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet { +class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet, + public testing::WithParamInterface { public: DBBasicTestMultiGetDeadline() : DBBasicTestMultiGet( @@ -3619,7 +3860,13 @@ class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet { } }; -TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { +TEST_P(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { +#ifndef USE_COROUTINES + if (GetParam()) { + ROCKSDB_GTEST_SKIP("This test requires coroutine support"); + return; + } +#endif // USE_COROUTINES std::shared_ptr fs = std::make_shared(env_, false); std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); Options options = CurrentOptions(); @@ -3650,6 +3897,7 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { ReadOptions ro; ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; + ro.async_io = GetParam(); // Delay the first IO fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 0); @@ -3752,6 +4000,9 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { Close(); } +INSTANTIATE_TEST_CASE_P(DeadlineIO, DBBasicTestMultiGetDeadline, + ::testing::Bool()); + TEST_F(DBBasicTest, ManifestWriteFailure) { Options options = GetDefaultOptions(); options.create_if_missing = true; diff --git a/db/version_set.cc b/db/version_set.cc index 40eb29188..70953f0e2 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -592,6 +592,10 @@ class FilePickerMultiGet { const MultiGetRange& CurrentFileRange() { return current_file_range_; } + bool RemainingOverlapInLevel() { + return !current_level_range_.Suffix(current_file_range_).empty(); + } + private: unsigned int num_levels_; unsigned int curr_level_; @@ -2231,7 +2235,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, // L0 files won't be parallelized anyway. The regular synchronous version // is faster. if (!read_options.async_io || !using_coroutines() || - fp.GetHitFileLevel() == 0) { + fp.GetHitFileLevel() == 0 || !fp.RemainingOverlapInLevel()) { if (f) { // Call MultiGetFromSST for looking up a single file s = MultiGetFromSST(read_options, fp.CurrentFileRange(), diff --git a/table/multiget_context.h b/table/multiget_context.h index 8778a2dcb..ca29816f5 100644 --- a/table/multiget_context.h +++ b/table/multiget_context.h @@ -290,6 +290,18 @@ class MultiGetContext { MultiGetContext* context() const { return ctx_; } + Range Suffix(const Range& other) const { + size_t other_last = other.FindLastRemaining(); + size_t my_last = FindLastRemaining(); + + if (my_last > other_last) { + return Range(*this, Iterator(this, other_last), + Iterator(this, my_last)); + } else { + return Range(*this, begin(), begin()); + } + } + private: friend MultiGetContext; MultiGetContext* ctx_; @@ -306,6 +318,15 @@ class MultiGetContext { return (((Mask{1} << end_) - 1) & ~((Mask{1} << start_) - 1) & ~(ctx_->value_mask_ | skip_mask_)); } + + size_t FindLastRemaining() const { + Mask mask = RemainingMask(); + size_t index = (mask >>= start_) ? start_ : 0; + while (mask >>= 1) { + index++; + } + return index; + } }; // Return the initial range that encompasses all the keys in the batch diff --git a/util/async_file_reader.cc b/util/async_file_reader.cc index e580ba86e..f6d76842e 100644 --- a/util/async_file_reader.cc +++ b/util/async_file_reader.cc @@ -19,6 +19,8 @@ bool AsyncFileReader::MultiReadAsyncImpl(ReadAwaiter* awaiter) { awaiter->io_handle_.resize(awaiter->num_reqs_); awaiter->del_fn_.resize(awaiter->num_reqs_); for (size_t i = 0; i < awaiter->num_reqs_; ++i) { + awaiter->io_handle_.push_back(nullptr); + awaiter->del_fn_.push_back(nullptr); awaiter->file_ ->ReadAsync( awaiter->read_reqs_[i], awaiter->opts_, @@ -44,10 +46,12 @@ void AsyncFileReader::Poll() { waiter = head_; do { for (size_t i = 0; i < waiter->num_reqs_; ++i) { - io_handles.push_back(waiter->io_handle_[i]); + if (waiter->io_handle_[i]) { + io_handles.push_back(waiter->io_handle_[i]); + } } } while (waiter != tail_ && (waiter = waiter->next_)); - { + if (io_handles.size() > 0) { StopWatch sw(SystemClock::Default().get(), stats_, POLL_WAIT_MICROS); fs_->Poll(io_handles, io_handles.size()).PermitUncheckedError(); } @@ -56,7 +60,9 @@ void AsyncFileReader::Poll() { head_ = waiter->next_; for (size_t i = 0; i < waiter->num_reqs_; ++i) { - waiter->del_fn_[i](waiter->io_handle_[i]); + if (waiter->io_handle_[i] && waiter->del_fn_[i]) { + waiter->del_fn_[i](waiter->io_handle_[i]); + } } waiter->awaiting_coro_.resume(); } while (waiter != tail_);