From 832b056a302b47ef7d85c366a1012562863f9c11 Mon Sep 17 00:00:00 2001 From: anand76 Date: Fri, 7 Aug 2020 11:59:19 -0700 Subject: [PATCH] Enable IO timeouts for iterators (#7161) Summary: Introduce io_timeout in ReadOptions and enabled deadline/io_timeout for Iterators. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7161 Test Plan: New unit tests in db_basic_test Reviewed By: riversand963 Differential Revision: D22687352 Pulled By: anand1976 fbshipit-source-id: 67bbb0e6d7ae80b256589244468494292538c6ec --- db/db_basic_test.cc | 278 +++++++++++++----- db/db_impl/db_impl.cc | 4 - file/file_util.h | 9 +- include/rocksdb/options.h | 9 +- options/options.cc | 2 + table/block_based/block_based_table_reader.cc | 4 +- table/block_based/partitioned_index_reader.cc | 1 + 7 files changed, 222 insertions(+), 85 deletions(-) diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 2bffbc6bb..4f70541a9 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -2829,12 +2829,11 @@ class DeadlineFS; class DeadlineRandomAccessFile : public FSRandomAccessFileWrapper { public: - DeadlineRandomAccessFile(DeadlineFS& fs, SpecialEnv* env, + DeadlineRandomAccessFile(DeadlineFS& fs, std::unique_ptr& file) : FSRandomAccessFileWrapper(file.get()), fs_(fs), - file_(std::move(file)), - env_(env) {} + file_(std::move(file)) {} IOStatus Read(uint64_t offset, size_t len, const IOOptions& opts, Slice* result, char* scratch, @@ -2846,18 +2845,22 @@ class DeadlineRandomAccessFile : public FSRandomAccessFileWrapper { private: DeadlineFS& fs_; std::unique_ptr file_; - SpecialEnv* env_; }; class DeadlineFS : public FileSystemWrapper { public: - explicit DeadlineFS(SpecialEnv* env) + // The error_on_delay parameter specifies whether a IOStatus::TimedOut() + // status should be returned after delaying the IO to exceed the timeout, + // or to simply delay but return success anyway. The latter mimics the + // behavior of PosixFileSystem, which does not enforce any timeout + explicit DeadlineFS(SpecialEnv* env, bool error_on_delay) : FileSystemWrapper(FileSystem::Default()), - delay_idx_(0), deadline_(std::chrono::microseconds::zero()), + io_timeout_(std::chrono::microseconds::zero()), env_(env), timedout_(false), - ignore_deadline_(false) {} + ignore_deadline_(false), + error_on_delay_(error_on_delay) {} IOStatus NewRandomAccessFile(const std::string& fname, const FileOptions& opts, @@ -2867,100 +2870,111 @@ class DeadlineFS : public FileSystemWrapper { IOStatus s; s = target()->NewRandomAccessFile(fname, opts, &file, dbg); - result->reset(new DeadlineRandomAccessFile(*this, env_, file)); + result->reset(new DeadlineRandomAccessFile(*this, file)); - int delay; const std::chrono::microseconds deadline = GetDeadline(); - if (deadline.count()) { - AssertDeadline(deadline, opts.io_options); + const std::chrono::microseconds io_timeout = GetIOTimeout(); + if (deadline.count() || io_timeout.count()) { + AssertDeadline(deadline, io_timeout, opts.io_options); } - if (ShouldDelay(&delay, &s)) { - env_->SleepForMicroseconds(delay); - } - return s; + return ShouldDelay(opts.io_options); } // Set a vector of {IO counter, delay in microseconds, return status} tuples // that control when to inject a delay and duration of the delay - void SetDelaySequence( - const std::chrono::microseconds deadline, - const std::vector>&& seq) { - int total_delay = 0; - for (auto& seq_iter : seq) { - // Ensure no individual delay is > 500ms - ASSERT_LT(std::get<1>(seq_iter), 500000); - total_delay += std::get<1>(seq_iter); - } - // ASSERT total delay is < 1s. This is mainly to keep the test from - // timing out in CI test frameworks - ASSERT_LT(total_delay, 1000000); - delay_seq_ = seq; - delay_idx_ = 0; + void SetDelayTrigger(const std::chrono::microseconds deadline, + const std::chrono::microseconds io_timeout, + const int trigger) { + delay_trigger_ = trigger; io_count_ = 0; deadline_ = deadline; + io_timeout_ = io_timeout; timedout_ = false; } // Increment the IO counter and return a delay in microseconds - bool ShouldDelay(int* delay, IOStatus* s) { - if (!ignore_deadline_ && delay_idx_ < delay_seq_.size() && - std::get<0>(delay_seq_[delay_idx_]) == io_count_++) { - *delay = std::get<1>(delay_seq_[delay_idx_]); - *s = std::get<2>(delay_seq_[delay_idx_]); - delay_idx_++; - timedout_ = true; - return true; + IOStatus ShouldDelay(const IOOptions& opts) { + if (!deadline_.count() && !io_timeout_.count()) { + return IOStatus::OK(); } - *s = IOStatus::OK(); - return false; + if (!ignore_deadline_ && delay_trigger_ == io_count_++) { + env_->SleepForMicroseconds(static_cast(opts.timeout.count() + 1)); + timedout_ = true; + if (error_on_delay_) { + return IOStatus::TimedOut(); + } + } + return IOStatus::OK(); } const std::chrono::microseconds GetDeadline() { return ignore_deadline_ ? std::chrono::microseconds::zero() : deadline_; } + const std::chrono::microseconds GetIOTimeout() { + return ignore_deadline_ ? std::chrono::microseconds::zero() : io_timeout_; + } + bool TimedOut() { return timedout_; } void IgnoreDeadline(bool ignore) { ignore_deadline_ = ignore; } void AssertDeadline(const std::chrono::microseconds deadline, + const std::chrono::microseconds io_timeout, const IOOptions& opts) const { // Give a leeway of +- 10us as it can take some time for the Get/ // MultiGet call to reach here, in order to avoid false alarms std::chrono::microseconds now = std::chrono::microseconds(env_->NowMicros()); - if (deadline - now != opts.timeout) { - ASSERT_EQ(deadline - now, opts.timeout); + std::chrono::microseconds timeout; + if (deadline.count()) { + timeout = deadline - now; + if (io_timeout.count()) { + timeout = std::min(timeout, io_timeout); + } + } else { + timeout = io_timeout; + } + if (opts.timeout != timeout) { + ASSERT_EQ(timeout, opts.timeout); } } private: - std::vector> delay_seq_; - size_t delay_idx_; + // The number of IOs to trigger the delay after + int delay_trigger_; + // Current IO count int io_count_; + // ReadOptions deadline for the Get/MultiGet/Iterator std::chrono::microseconds deadline_; + // ReadOptions io_timeout for the Get/MultiGet/Iterator + std::chrono::microseconds io_timeout_; SpecialEnv* env_; + // Flag to indicate whether we injected a delay bool timedout_; + // Temporarily ignore deadlines/timeouts bool ignore_deadline_; + // Return IOStatus::TimedOut() or IOStatus::OK() + bool error_on_delay_; }; IOStatus DeadlineRandomAccessFile::Read(uint64_t offset, size_t len, const IOOptions& opts, Slice* result, char* scratch, IODebugContext* dbg) const { - int delay; const std::chrono::microseconds deadline = fs_.GetDeadline(); + const std::chrono::microseconds io_timeout = fs_.GetIOTimeout(); IOStatus s; - if (deadline.count()) { - fs_.AssertDeadline(deadline, opts); - } - if (fs_.ShouldDelay(&delay, &s)) { - env_->SleepForMicroseconds(delay); + if (deadline.count() || io_timeout.count()) { + fs_.AssertDeadline(deadline, io_timeout, opts); } if (s.ok()) { s = FSRandomAccessFileWrapper::Read(offset, len, opts, result, scratch, dbg); } + if (s.ok()) { + s = fs_.ShouldDelay(opts); + } return s; } @@ -2968,23 +2982,23 @@ IOStatus DeadlineRandomAccessFile::MultiRead(FSReadRequest* reqs, size_t num_reqs, const IOOptions& options, IODebugContext* dbg) { - int delay; const std::chrono::microseconds deadline = fs_.GetDeadline(); + const std::chrono::microseconds io_timeout = fs_.GetIOTimeout(); IOStatus s; - if (deadline.count()) { - fs_.AssertDeadline(deadline, options); - } - if (fs_.ShouldDelay(&delay, &s)) { - env_->SleepForMicroseconds(delay); + if (deadline.count() || io_timeout.count()) { + fs_.AssertDeadline(deadline, io_timeout, options); } if (s.ok()) { s = FSRandomAccessFileWrapper::MultiRead(reqs, num_reqs, options, dbg); } + if (s.ok()) { + s = fs_.ShouldDelay(options); + } return s; } // A test class for intercepting random reads and injecting artificial -// delays. Used for testing the deadline/timeout feature +// delays. Used for testing the MultiGet deadline feature class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet { public: DBBasicTestMultiGetDeadline() @@ -3000,14 +3014,16 @@ class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet { if (i < num_ok) { EXPECT_OK(statuses[i]); } else { - EXPECT_EQ(statuses[i], Status::TimedOut()); + if (statuses[i] != Status::TimedOut()) { + EXPECT_EQ(statuses[i], Status::TimedOut()); + } } } } }; TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { - std::shared_ptr fs = std::make_shared(env_); + std::shared_ptr fs = std::make_shared(env_, false); std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); Options options = CurrentOptions(); env_->SetTimeElapseOnlySleep(&options); @@ -3037,9 +3053,8 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { ReadOptions ro; ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; - // Delay the first IO by 200ms - fs->SetDelaySequence( - ro.deadline, {std::tuple{0, 20000, IOStatus::OK()}}); + // Delay the first IO + fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 0); std::vector statuses = dbfull()->MultiGet(ro, cfs, keys, &values); // The first key is successful because we check after the lookup, but @@ -3064,8 +3079,7 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { keys[i] = Slice(key_str[i].data(), key_str[i].size()); } ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; - fs->SetDelaySequence( - ro.deadline, {std::tuple{1, 20000, IOStatus::OK()}}); + fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 1); statuses = dbfull()->MultiGet(ro, cfs, keys, &values); CheckStatus(statuses, 3); @@ -3079,8 +3093,7 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { statuses.clear(); statuses.resize(keys.size()); ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; - fs->SetDelaySequence( - ro.deadline, {std::tuple{0, 20000, IOStatus::OK()}}); + fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 0); dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(), pin_values.data(), statuses.data()); CheckStatus(statuses, 2); @@ -3095,8 +3108,7 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { statuses.clear(); statuses.resize(keys.size()); ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; - fs->SetDelaySequence( - ro.deadline, {std::tuple{2, 20000, IOStatus::OK()}}); + fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 2); dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(), pin_values.data(), statuses.data()); CheckStatus(statuses, 6); @@ -3110,8 +3122,7 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { statuses.clear(); statuses.resize(keys.size()); ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; - fs->SetDelaySequence( - ro.deadline, {std::tuple{3, 20000, IOStatus::OK()}}); + fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 3); dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(), pin_values.data(), statuses.data()); CheckStatus(statuses, 8); @@ -3137,8 +3148,7 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { statuses.clear(); statuses.resize(keys.size()); ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; - fs->SetDelaySequence( - ro.deadline, {std::tuple{1, 20000, IOStatus::OK()}}); + fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 1); dbfull()->MultiGet(ro, handles_[0], keys.size(), keys.data(), pin_values.data(), statuses.data()); CheckStatus(statuses, 64); @@ -3172,9 +3182,17 @@ TEST_F(DBBasicTest, ManifestWriteFailure) { Reopen(options); } -TEST_F(DBBasicTest, PointLookupDeadline) { - std::shared_ptr fs = std::make_shared(env_); +// A test class for intercepting random reads and injecting artificial +// delays. Used for testing the deadline/timeout feature +class DBBasicTestDeadline + : public DBBasicTest, + public testing::WithParamInterface> {}; + +TEST_P(DBBasicTestDeadline, PointLookupDeadline) { + std::shared_ptr fs = std::make_shared(env_, true); std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); + bool set_deadline = std::get<0>(GetParam()); + bool set_timeout = std::get<1>(GetParam()); // Since we call SetTimeElapseOnlySleep, Close() later on may not work // properly for the DB that's opened by the DBTestBase constructor. @@ -3241,10 +3259,13 @@ TEST_F(DBBasicTest, PointLookupDeadline) { // and cause the Get() to fail. while (timedout) { ReadOptions ro; - ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; - fs->SetDelaySequence( - ro.deadline, {std::tuple{ - io_deadline_trigger, 20000, IOStatus::TimedOut()}}); + if (set_deadline) { + ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; + } + if (set_timeout) { + ro.io_timeout = std::chrono::microseconds{5000}; + } + fs->SetDelayTrigger(ro.deadline, ro.io_timeout, io_deadline_trigger); block_cache->SetCapacity(0); block_cache->SetCapacity(1048576); @@ -3260,11 +3281,112 @@ TEST_F(DBBasicTest, PointLookupDeadline) { io_deadline_trigger++; } // Reset the delay sequence in order to avoid false alarms during Reopen - fs->SetDelaySequence(std::chrono::microseconds::zero(), {}); + fs->SetDelayTrigger(std::chrono::microseconds::zero(), + std::chrono::microseconds::zero(), 0); } Close(); } +TEST_P(DBBasicTestDeadline, IteratorDeadline) { + std::shared_ptr fs = std::make_shared(env_, true); + std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); + bool set_deadline = std::get<0>(GetParam()); + bool set_timeout = std::get<1>(GetParam()); + + // Since we call SetTimeElapseOnlySleep, Close() later on may not work + // properly for the DB that's opened by the DBTestBase constructor. + Close(); + for (int option_config = kDefault; option_config < kEnd; ++option_config) { + if (ShouldSkipOptions(option_config, kSkipPlainTable | kSkipMmapReads)) { + continue; + } + Options options = CurrentOptions(); + if (options.use_direct_reads) { + continue; + } + options.env = env.get(); + options.disable_auto_compactions = true; + Cache* block_cache = nullptr; + env_->SetTimeElapseOnlySleep(&options); + // DB open will create table readers unless we reduce the table cache + // capacity. + // SanitizeOptions will set max_open_files to minimum of 20. Table cache + // is allocated with max_open_files - 10 as capacity. So override + // max_open_files to 11 so table cache capacity will become 1. This will + // prevent file open during DB open and force the file to be opened + // during MultiGet + SyncPoint::GetInstance()->SetCallBack( + "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) { + int* max_open_files = (int*)arg; + *max_open_files = 11; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + Reopen(options); + + if (options.table_factory && + !strcmp(options.table_factory->Name(), + BlockBasedTableFactory::kName.c_str())) { + BlockBasedTableFactory* bbtf = + static_cast(options.table_factory.get()); + block_cache = bbtf->table_options().block_cache.get(); + } + + Random rnd(301); + for (int i = 0; i < 400; ++i) { + std::string key = "k" + ToString(i); + Put(key, rnd.RandomString(100)); + } + Flush(); + + bool timedout = true; + // A timeout will be forced when the IO counter reaches this value + int io_deadline_trigger = 0; + // Keep incrementing io_deadline_trigger and call Get() until there is an + // iteration that doesn't cause a timeout. This ensures that we cover + // all file reads in the point lookup path that can potentially timeout + while (timedout) { + ReadOptions ro; + if (set_deadline) { + ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; + } + if (set_timeout) { + ro.io_timeout = std::chrono::microseconds{5000}; + } + fs->SetDelayTrigger(ro.deadline, ro.io_timeout, io_deadline_trigger); + + block_cache->SetCapacity(0); + block_cache->SetCapacity(1048576); + + Iterator* iter = dbfull()->NewIterator(ro); + int count = 0; + iter->Seek("k50"); + while (iter->Valid() && count++ < 100) { + iter->Next(); + } + if (fs->TimedOut()) { + ASSERT_FALSE(iter->Valid()); + ASSERT_EQ(iter->status(), Status::TimedOut()); + } else { + timedout = false; + ASSERT_OK(iter->status()); + } + delete iter; + io_deadline_trigger++; + } + // Reset the delay sequence in order to avoid false alarms during Reopen + fs->SetDelayTrigger(std::chrono::microseconds::zero(), + std::chrono::microseconds::zero(), 0); + } + Close(); +} + +// Param 0: If true, set read_options.deadline +// Param 1: If true, set read_options.io_timeout +INSTANTIATE_TEST_CASE_P(DBBasicTestDeadline, DBBasicTestDeadline, + ::testing::Values(std::make_tuple(true, false), + std::make_tuple(false, true), + std::make_tuple(true, true))); } // namespace ROCKSDB_NAMESPACE #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 65a556910..bf4bf0784 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -2708,10 +2708,6 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, return NewErrorIterator( Status::NotSupported("Managed iterator is not supported anymore.")); } - if (read_options.deadline != std::chrono::microseconds::zero()) { - return NewErrorIterator( - Status::NotSupported("ReadOptions deadline is not supported")); - } Iterator* result = nullptr; if (read_options.read_tier == kPersistedTier) { return NewErrorIterator(Status::NotSupported( diff --git a/file/file_util.h b/file/file_util.h index 1f77b760a..17b058038 100644 --- a/file/file_util.h +++ b/file/file_util.h @@ -45,11 +45,18 @@ inline IOStatus PrepareIOFromReadOptions(const ReadOptions& ro, Env* env, if (ro.deadline.count()) { std::chrono::microseconds now = std::chrono::microseconds(env->NowMicros()); - if (now > ro.deadline) { + // Ensure there is atleast 1us available. We don't want to pass a value of + // 0 as that means no timeout + if (now >= ro.deadline) { return IOStatus::TimedOut("Deadline exceeded"); } opts.timeout = ro.deadline - now; } + + if (ro.io_timeout.count() && + (!opts.timeout.count() || ro.io_timeout < opts.timeout)) { + opts.timeout = ro.io_timeout; + } return IOStatus::OK(); } diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index b0880e144..2186d1e7d 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1347,7 +1347,8 @@ struct ReadOptions { const Slice* timestamp; const Slice* iter_start_ts; - // Deadline for completing the read request (only Get/MultiGet for now) in us. + // Deadline for completing an API call (Get/MultiGet/Seek/Next for now) + // in microseconds. // It should be set to microseconds since epoch, i.e, gettimeofday or // equivalent plus allowed duration in microseconds. The best way is to use // env->NowMicros() + some timeout. @@ -1357,6 +1358,12 @@ struct ReadOptions { // processing a batch std::chrono::microseconds deadline; + // A timeout in microseconds to be passed to the underlying FileSystem for + // reads. As opposed to deadline, this determines the timeout for each + // individual file read request. If a MultiGet/Get/Seek/Next etc call + // results in multiple reads, each read can last upto io_timeout us. + std::chrono::microseconds io_timeout; + // It limits the maximum cumulative value size of the keys in batch while // reading through MultiGet. Once the cumulative value size exceeds this // soft limit then all the remaining keys are returned with status Aborted. diff --git a/options/options.cc b/options/options.cc index 599886d4e..d2b8d5d4c 100644 --- a/options/options.cc +++ b/options/options.cc @@ -613,6 +613,7 @@ ReadOptions::ReadOptions() timestamp(nullptr), iter_start_ts(nullptr), deadline(std::chrono::microseconds::zero()), + io_timeout(std::chrono::microseconds::zero()), value_size_soft_limit(std::numeric_limits::max()) {} ReadOptions::ReadOptions(bool cksum, bool cache) @@ -636,6 +637,7 @@ ReadOptions::ReadOptions(bool cksum, bool cache) timestamp(nullptr), iter_start_ts(nullptr), deadline(std::chrono::microseconds::zero()), + io_timeout(std::chrono::microseconds::zero()), value_size_soft_limit(std::numeric_limits::max()) {} } // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index ddbec3560..66b4c6ddb 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -595,12 +595,14 @@ Status BlockBasedTable::Open( Footer footer; std::unique_ptr prefetch_buffer; - // Only retain read_options.deadline. In future, we may retain more + // Only retain read_options.deadline and read_options.io_timeout. + // In future, we may retain more // options. Specifically, w ignore verify_checksums and default to // checksum verification anyway when creating the index and filter // readers. ReadOptions ro; ro.deadline = read_options.deadline; + ro.io_timeout = read_options.io_timeout; // prefetch both index and filters, down to all partitions const bool prefetch_all = prefetch_index_and_filter_in_cache || level == 0; diff --git a/table/block_based/partitioned_index_reader.cc b/table/block_based/partitioned_index_reader.cc index e8cfd0cf6..c120c26ea 100644 --- a/table/block_based/partitioned_index_reader.cc +++ b/table/block_based/partitioned_index_reader.cc @@ -78,6 +78,7 @@ InternalIteratorBase* PartitionIndexReader::NewIterator( ReadOptions ro; ro.fill_cache = read_options.fill_cache; ro.deadline = read_options.deadline; + ro.io_timeout = read_options.io_timeout; // We don't return pinned data from index blocks, so no need // to set `block_contents_pinned`. std::unique_ptr> index_iter(