From 9a5886bd8c71aaa4466e0a8b1d7d1f04ae452b6a Mon Sep 17 00:00:00 2001 From: Anand Ananthabhotla Date: Mon, 29 Jun 2020 14:51:57 -0700 Subject: [PATCH] Extend Get/MultiGet deadline support to table open (#6982) Summary: Current implementation of the ```read_options.deadline``` option only checks the deadline for random file reads during point lookups. This PR extends the checks to file opens, prefetches and preloads as part of table open. The main changes are in the ```BlockBasedTable```, partitioned index and filter readers, and ```TableCache``` to take ReadOptions as an additional parameter. In ```BlockBasedTable::Open```, in order to retain existing behavior w.r.t checksum verification and block cache usage, we filter out most of the options in ```ReadOptions``` except ```deadline```. However, having the ```ReadOptions``` gives us more flexibility to honor other options like verify_checksums, fill_cache etc. in the future. Additional changes in callsites due to function signature changes in ```NewTableReader()``` and ```FilePrefetchBuffer```. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6982 Test Plan: Add new unit tests in db_basic_test Reviewed By: riversand963 Differential Revision: D22219515 Pulled By: anand1976 fbshipit-source-id: 8a3b92f4a889808013838603aa3ca35229cd501b --- db/db_basic_test.cc | 402 ++++++++++++------ db/db_impl/db_impl.cc | 2 - db/plain_table_db_test.cc | 3 +- db/table_cache.cc | 58 +-- db/table_cache.h | 4 +- db/version_builder.cc | 7 +- file/file_prefetch_buffer.cc | 13 +- file/file_prefetch_buffer.h | 10 +- file/file_util.cc | 3 +- file/random_access_file_reader.h | 2 + include/rocksdb/table.h | 14 +- table/adaptive/adaptive_table_factory.cc | 10 +- table/adaptive/adaptive_table_factory.h | 3 +- .../block_based/binary_search_index_reader.cc | 8 +- .../block_based/binary_search_index_reader.h | 2 +- table/block_based/block_based_filter_block.cc | 12 +- table/block_based/block_based_filter_block.h | 6 +- .../block_based/block_based_table_factory.cc | 4 +- table/block_based/block_based_table_factory.h | 3 +- table/block_based/block_based_table_reader.cc | 128 +++--- table/block_based/block_based_table_reader.h | 34 +- .../block_based_table_reader_test.cc | 3 +- table/block_based/filter_block.h | 2 +- table/block_based/full_filter_block.cc | 12 +- table/block_based/full_filter_block.h | 6 +- table/block_based/hash_index_reader.cc | 3 +- table/block_based/hash_index_reader.h | 2 +- table/block_based/partitioned_filter_block.cc | 31 +- table/block_based/partitioned_filter_block.h | 8 +- table/block_based/partitioned_index_reader.cc | 22 +- table/block_based/partitioned_index_reader.h | 4 +- .../block_based/uncompression_dict_reader.cc | 10 +- table/block_based/uncompression_dict_reader.h | 6 +- table/block_fetcher.cc | 21 +- table/block_fetcher_test.cc | 11 +- table/cuckoo/cuckoo_table_factory.cc | 2 +- table/cuckoo/cuckoo_table_factory.h | 3 +- table/format.cc | 15 +- table/format.h | 2 +- table/meta_blocks.cc | 38 +- table/meta_blocks.h | 3 +- table/mock_table.cc | 1 + table/mock_table.h | 3 +- table/plain/plain_table_factory.cc | 2 +- table/plain/plain_table_factory.h | 6 +- table/sst_file_dumper.cc | 6 +- table/table_test.cc | 16 +- utilities/options/options_util_test.cc | 2 + 48 files changed, 608 insertions(+), 360 deletions(-) diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index badcacc2d..e9584b568 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -7,6 +7,8 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include + #include "db/db_test_util.h" #include "port/stack_trace.h" #include "rocksdb/merge_operator.h" @@ -2801,132 +2803,176 @@ INSTANTIATE_TEST_CASE_P(ParallelIO, DBBasicTestWithParallelIO, ::testing::Bool(), ::testing::Bool(), ::testing::Values(1, 4))); +// Forward declaration +class DeadlineFS; + +class DeadlineRandomAccessFile : public FSRandomAccessFileWrapper { + public: + DeadlineRandomAccessFile(DeadlineFS& fs, SpecialEnv* env, + std::unique_ptr& file) + : FSRandomAccessFileWrapper(file.get()), + fs_(fs), + file_(std::move(file)), + env_(env) {} + + IOStatus Read(uint64_t offset, size_t len, const IOOptions& opts, + Slice* result, char* scratch, + IODebugContext* dbg) const override; + + IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs, + const IOOptions& options, IODebugContext* dbg) override; + + private: + DeadlineFS& fs_; + std::unique_ptr file_; + SpecialEnv* env_; +}; + +class DeadlineFS : public FileSystemWrapper { + public: + explicit DeadlineFS(SpecialEnv* env) + : FileSystemWrapper(FileSystem::Default()), + delay_idx_(0), + deadline_(std::chrono::microseconds::zero()), + env_(env), + timedout_(false), + ignore_deadline_(false) {} + + IOStatus NewRandomAccessFile(const std::string& fname, + const FileOptions& opts, + std::unique_ptr* result, + IODebugContext* dbg) override { + std::unique_ptr file; + IOStatus s; + + s = target()->NewRandomAccessFile(fname, opts, &file, dbg); + result->reset(new DeadlineRandomAccessFile(*this, env_, file)); + + int delay; + const std::chrono::microseconds deadline = GetDeadline(); + if (deadline.count()) { + AssertDeadline(deadline, opts.io_options); + } + if (ShouldDelay(&delay, &s)) { + env_->SleepForMicroseconds(delay); + } + return s; + } + + // Set a vector of {IO counter, delay in microseconds, return status} tuples + // that control when to inject a delay and duration of the delay + void SetDelaySequence( + const std::chrono::microseconds deadline, + const std::vector>&& seq) { + int total_delay = 0; + for (auto& seq_iter : seq) { + // Ensure no individual delay is > 500ms + ASSERT_LT(std::get<1>(seq_iter), 500000); + total_delay += std::get<1>(seq_iter); + } + // ASSERT total delay is < 1s. This is mainly to keep the test from + // timing out in CI test frameworks + ASSERT_LT(total_delay, 1000000); + delay_seq_ = seq; + delay_idx_ = 0; + io_count_ = 0; + deadline_ = deadline; + timedout_ = false; + } + + // Increment the IO counter and return a delay in microseconds + bool ShouldDelay(int* delay, IOStatus* s) { + if (!ignore_deadline_ && delay_idx_ < delay_seq_.size() && + std::get<0>(delay_seq_[delay_idx_]) == io_count_++) { + *delay = std::get<1>(delay_seq_[delay_idx_]); + *s = std::get<2>(delay_seq_[delay_idx_]); + delay_idx_++; + timedout_ = true; + return true; + } + *s = IOStatus::OK(); + return false; + } + + const std::chrono::microseconds GetDeadline() { + return ignore_deadline_ ? std::chrono::microseconds::zero() : deadline_; + } + + bool TimedOut() { return timedout_; } + + void IgnoreDeadline(bool ignore) { ignore_deadline_ = ignore; } + + void AssertDeadline(const std::chrono::microseconds deadline, + const IOOptions& opts) const { + // Give a leeway of +- 10us as it can take some time for the Get/ + // MultiGet call to reach here, in order to avoid false alarms + std::chrono::microseconds now = + std::chrono::microseconds(env_->NowMicros()); + if (deadline - now != opts.timeout) { + ASSERT_EQ(deadline - now, opts.timeout); + } + } + + private: + std::vector> delay_seq_; + size_t delay_idx_; + int io_count_; + std::chrono::microseconds deadline_; + SpecialEnv* env_; + bool timedout_; + bool ignore_deadline_; +}; + +IOStatus DeadlineRandomAccessFile::Read(uint64_t offset, size_t len, + const IOOptions& opts, Slice* result, + char* scratch, + IODebugContext* dbg) const { + int delay; + const std::chrono::microseconds deadline = fs_.GetDeadline(); + IOStatus s; + if (deadline.count()) { + fs_.AssertDeadline(deadline, opts); + } + if (fs_.ShouldDelay(&delay, &s)) { + env_->SleepForMicroseconds(delay); + } + if (s.ok()) { + s = FSRandomAccessFileWrapper::Read(offset, len, opts, result, scratch, + dbg); + } + return s; +} + +IOStatus DeadlineRandomAccessFile::MultiRead(FSReadRequest* reqs, + size_t num_reqs, + const IOOptions& options, + IODebugContext* dbg) { + int delay; + const std::chrono::microseconds deadline = fs_.GetDeadline(); + IOStatus s; + if (deadline.count()) { + fs_.AssertDeadline(deadline, options); + } + if (fs_.ShouldDelay(&delay, &s)) { + env_->SleepForMicroseconds(delay); + } + if (s.ok()) { + s = FSRandomAccessFileWrapper::MultiRead(reqs, num_reqs, options, dbg); + } + return s; +} + // A test class for intercepting random reads and injecting artificial // delays. Used for testing the deadline/timeout feature class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet { public: DBBasicTestMultiGetDeadline() - : DBBasicTestMultiGet("db_basic_test_multiget_deadline" /*Test dir*/, - 10 /*# of column families*/, - false /*compressed cache enabled*/, - true /*uncompressed cache enabled*/, - true /*compression enabled*/, - true /*ReadOptions.fill_cache*/, - 1 /*# of parallel compression threads*/) {} - - // Forward declaration - class DeadlineFS; - - class DeadlineRandomAccessFile : public FSRandomAccessFileWrapper { - public: - DeadlineRandomAccessFile(DeadlineFS& fs, SpecialEnv* env, - std::unique_ptr& file) - : FSRandomAccessFileWrapper(file.get()), - fs_(fs), - file_(std::move(file)), - env_(env) {} - - IOStatus Read(uint64_t offset, size_t len, const IOOptions& opts, - Slice* result, char* scratch, IODebugContext* dbg) const override { - int delay; - const std::chrono::microseconds deadline = fs_.GetDeadline(); - if (deadline.count()) { - AssertDeadline(deadline, opts); - } - if (fs_.ShouldDelay(&delay)) { - env_->SleepForMicroseconds(delay); - } - return FSRandomAccessFileWrapper::Read(offset, len, opts, result, scratch, - dbg); - } - - IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs, - const IOOptions& options, IODebugContext* dbg) override { - int delay; - const std::chrono::microseconds deadline = fs_.GetDeadline(); - if (deadline.count()) { - AssertDeadline(deadline, options); - } - if (fs_.ShouldDelay(&delay)) { - env_->SleepForMicroseconds(delay); - } - return FSRandomAccessFileWrapper::MultiRead(reqs, num_reqs, options, dbg); - } - - private: - void AssertDeadline(const std::chrono::microseconds deadline, - const IOOptions& opts) const { - // Give a leeway of +- 10us as it can take some time for the Get/ - // MultiGet call to reach here, in order to avoid false alarms - std::chrono::microseconds now = - std::chrono::microseconds(env_->NowMicros()); - ASSERT_EQ(deadline - now, opts.timeout); - } - DeadlineFS& fs_; - std::unique_ptr file_; - SpecialEnv* env_; - }; - - class DeadlineFS : public FileSystemWrapper { - public: - DeadlineFS(SpecialEnv* env) - : FileSystemWrapper(FileSystem::Default()), - delay_idx_(0), - deadline_(std::chrono::microseconds::zero()), - env_(env) {} - ~DeadlineFS() = default; - - IOStatus NewRandomAccessFile(const std::string& fname, - const FileOptions& opts, - std::unique_ptr* result, - IODebugContext* dbg) override { - std::unique_ptr file; - IOStatus s; - - s = target()->NewRandomAccessFile(fname, opts, &file, dbg); - result->reset(new DeadlineRandomAccessFile(*this, env_, file)); - return s; - } - - // Set a vector of {IO counter, delay in microseconds} pairs that control - // when to inject a delay and duration of the delay - void SetDelaySequence(const std::chrono::microseconds deadline, - const std::vector>&& seq) { - int total_delay = 0; - for (auto& seq_iter : seq) { - // Ensure no individual delay is > 500ms - ASSERT_LT(seq_iter.second, 500000); - total_delay += seq_iter.second; - } - // ASSERT total delay is < 1s. This is mainly to keep the test from - // timing out in CI test frameworks - ASSERT_LT(total_delay, 1000000); - delay_seq_ = seq; - delay_idx_ = 0; - io_count_ = 0; - deadline_ = deadline; - } - - // Increment the IO counter and return a delay in microseconds - bool ShouldDelay(int* delay) { - if (delay_idx_ < delay_seq_.size() && - delay_seq_[delay_idx_].first == io_count_++) { - *delay = delay_seq_[delay_idx_].second; - delay_idx_++; - return true; - } - return false; - } - - const std::chrono::microseconds GetDeadline() { return deadline_; } - - private: - std::vector> delay_seq_; - size_t delay_idx_; - int io_count_; - std::chrono::microseconds deadline_; - SpecialEnv* env_; - }; + : DBBasicTestMultiGet( + "db_basic_test_multiget_deadline" /*Test dir*/, + 10 /*# of column families*/, false /*compressed cache enabled*/, + true /*uncompressed cache enabled*/, true /*compression enabled*/, + true /*ReadOptions.fill_cache*/, + 1 /*# of parallel compression threads*/) {} inline void CheckStatus(std::vector& statuses, size_t num_ok) { for (size_t i = 0; i < statuses.size(); ++i) { @@ -2940,8 +2986,7 @@ class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet { }; TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { - std::shared_ptr fs( - new DBBasicTestMultiGetDeadline::DeadlineFS(env_)); + std::shared_ptr fs = std::make_shared(env_); std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); Options options = CurrentOptions(); env_->SetTimeElapseOnlySleep(&options); @@ -2972,7 +3017,8 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { ReadOptions ro; ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; // Delay the first IO by 200ms - fs->SetDelaySequence(ro.deadline, {{0, 20000}}); + fs->SetDelaySequence( + ro.deadline, {std::tuple{0, 20000, IOStatus::OK()}}); std::vector statuses = dbfull()->MultiGet(ro, cfs, keys, &values); // The first key is successful because we check after the lookup, but @@ -2997,7 +3043,8 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { keys[i] = Slice(key_str[i].data(), key_str[i].size()); } ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; - fs->SetDelaySequence(ro.deadline, {{1, 20000}}); + fs->SetDelaySequence( + ro.deadline, {std::tuple{1, 20000, IOStatus::OK()}}); statuses = dbfull()->MultiGet(ro, cfs, keys, &values); CheckStatus(statuses, 3); @@ -3011,7 +3058,8 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { statuses.clear(); statuses.resize(keys.size()); ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; - fs->SetDelaySequence(ro.deadline, {{0, 20000}}); + fs->SetDelaySequence( + ro.deadline, {std::tuple{0, 20000, IOStatus::OK()}}); dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(), pin_values.data(), statuses.data()); CheckStatus(statuses, 2); @@ -3026,7 +3074,8 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { statuses.clear(); statuses.resize(keys.size()); ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; - fs->SetDelaySequence(ro.deadline, {{2, 20000}}); + fs->SetDelaySequence( + ro.deadline, {std::tuple{2, 20000, IOStatus::OK()}}); dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(), pin_values.data(), statuses.data()); CheckStatus(statuses, 6); @@ -3040,7 +3089,8 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { statuses.clear(); statuses.resize(keys.size()); ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; - fs->SetDelaySequence(ro.deadline, {{3, 20000}}); + fs->SetDelaySequence( + ro.deadline, {std::tuple{3, 20000, IOStatus::OK()}}); dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(), pin_values.data(), statuses.data()); CheckStatus(statuses, 8); @@ -3066,7 +3116,8 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { statuses.clear(); statuses.resize(keys.size()); ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; - fs->SetDelaySequence(ro.deadline, {{1, 20000}}); + fs->SetDelaySequence( + ro.deadline, {std::tuple{1, 20000, IOStatus::OK()}}); dbfull()->MultiGet(ro, handles_[0], keys.size(), keys.data(), pin_values.data(), statuses.data()); CheckStatus(statuses, 64); @@ -3100,6 +3151,99 @@ TEST_F(DBBasicTest, ManifestWriteFailure) { Reopen(options); } +TEST_F(DBBasicTest, PointLookupDeadline) { + std::shared_ptr fs = std::make_shared(env_); + std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); + + // Since we call SetTimeElapseOnlySleep, Close() later on may not work + // properly for the DB that's opened by the DBTestBase constructor. + Close(); + for (int option_config = kDefault; option_config < kEnd; ++option_config) { + if (ShouldSkipOptions(option_config, kSkipPlainTable | kSkipMmapReads)) { + continue; + } + option_config_ = option_config; + Options options = CurrentOptions(); + if (options.use_direct_reads) { + continue; + } + options.env = env.get(); + options.disable_auto_compactions = true; + Cache* block_cache = nullptr; + env_->SetTimeElapseOnlySleep(&options); + // Fileter block reads currently don't cause the request to get + // aborted on a read timeout, so its possible those block reads + // may get issued even if the deadline is past + SyncPoint::GetInstance()->SetCallBack( + "BlockBasedTable::Get:BeforeFilterMatch", + [&](void* /*arg*/) { fs->IgnoreDeadline(true); }); + SyncPoint::GetInstance()->SetCallBack( + "BlockBasedTable::Get:AfterFilterMatch", + [&](void* /*arg*/) { fs->IgnoreDeadline(false); }); + // DB open will create table readers unless we reduce the table cache + // capacity. + // SanitizeOptions will set max_open_files to minimum of 20. Table cache + // is allocated with max_open_files - 10 as capacity. So override + // max_open_files to 11 so table cache capacity will become 1. This will + // prevent file open during DB open and force the file to be opened + // during MultiGet + SyncPoint::GetInstance()->SetCallBack( + "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) { + int* max_open_files = (int*)arg; + *max_open_files = 11; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + Reopen(options); + + if (options.table_factory && + !strcmp(options.table_factory->Name(), + BlockBasedTableFactory::kName.c_str())) { + BlockBasedTableFactory* bbtf = + static_cast(options.table_factory.get()); + block_cache = bbtf->table_options().block_cache.get(); + } + + Random rnd(301); + for (int i = 0; i < 400; ++i) { + std::string key = "k" + ToString(i); + Put(key, RandomString(&rnd, 100)); + } + Flush(); + + bool timedout = true; + // A timeout will be forced when the IO counter reaches this value + int io_deadline_trigger = 0; + // Keep incrementing io_deadline_trigger and call Get() until there is an + // iteration that doesn't cause a timeout. This ensures that we cover + // all file reads in the point lookup path that can potentially timeout + // and cause the Get() to fail. + while (timedout) { + ReadOptions ro; + ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; + fs->SetDelaySequence( + ro.deadline, {std::tuple{ + io_deadline_trigger, 20000, IOStatus::TimedOut()}}); + + block_cache->SetCapacity(0); + block_cache->SetCapacity(1048576); + + std::string value; + Status s = dbfull()->Get(ro, "k50", &value); + if (fs->TimedOut()) { + ASSERT_EQ(s, Status::TimedOut()); + } else { + timedout = false; + ASSERT_OK(s); + } + io_deadline_trigger++; + } + // Reset the delay sequence in order to avoid false alarms during Reopen + fs->SetDelaySequence(std::chrono::microseconds::zero(), {}); + } + Close(); +} + } // namespace ROCKSDB_NAMESPACE #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 1c3ffb752..0f006e277 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -2685,8 +2685,6 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, return NewErrorIterator( Status::NotSupported("Managed iterator is not supported anymore.")); } - // We will eventually support deadline for iterators too, but safeguard - // for now if (read_options.deadline != std::chrono::microseconds::zero()) { return NewErrorIterator( Status::NotSupported("ReadOptions deadline is not supported")); diff --git a/db/plain_table_db_test.cc b/db/plain_table_db_test.cc index d86d54db0..f61a280ad 100644 --- a/db/plain_table_db_test.cc +++ b/db/plain_table_db_test.cc @@ -336,8 +336,9 @@ class TestPlainTableFactory : public PlainTableFactory { column_family_id_(column_family_id), column_family_name_(std::move(column_family_name)) {} + using PlainTableFactory::NewTableReader; Status NewTableReader( - const TableReaderOptions& table_reader_options, + const ReadOptions& /*ro*/, const TableReaderOptions& table_reader_options, std::unique_ptr&& file, uint64_t file_size, std::unique_ptr* table, bool /*prefetch_index_and_filter_in_cache*/) const override { diff --git a/db/table_cache.cc b/db/table_cache.cc index 6b29349f3..1ce82c1a4 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -13,6 +13,7 @@ #include "db/range_tombstone_fragmenter.h" #include "db/snapshot_impl.h" #include "db/version_edit.h" +#include "file/file_util.h" #include "file/filename.h" #include "file/random_access_file_reader.h" #include "monitoring/perf_context_imp.h" @@ -92,7 +93,7 @@ void TableCache::ReleaseHandle(Cache::Handle* handle) { } Status TableCache::GetTableReader( - const FileOptions& file_options, + const ReadOptions& ro, const FileOptions& file_options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist, std::unique_ptr* table_reader, @@ -102,12 +103,19 @@ Status TableCache::GetTableReader( std::string fname = TableFileName(ioptions_.cf_paths, fd.GetNumber(), fd.GetPathId()); std::unique_ptr file; - Status s = ioptions_.fs->NewRandomAccessFile(fname, file_options, &file, - nullptr); + FileOptions fopts = file_options; + Status s = PrepareIOFromReadOptions(ro, ioptions_.env, fopts.io_options); + if (s.ok()) { + s = ioptions_.fs->NewRandomAccessFile(fname, fopts, &file, nullptr); + } RecordTick(ioptions_.statistics, NO_FILE_OPENS); if (s.IsPathNotFound()) { fname = Rocks2LevelTableFileName(fname); - s = ioptions_.fs->NewRandomAccessFile(fname, file_options, &file, nullptr); + s = PrepareIOFromReadOptions(ro, ioptions_.env, fopts.io_options); + if (s.ok()) { + s = ioptions_.fs->NewRandomAccessFile(fname, file_options, &file, + nullptr); + } RecordTick(ioptions_.statistics, NO_FILE_OPENS); } @@ -122,6 +130,7 @@ Status TableCache::GetTableReader( record_read_stats ? ioptions_.statistics : nullptr, SST_READ_MICROS, file_read_hist, ioptions_.rate_limiter, ioptions_.listeners)); s = ioptions_.table_factory->NewTableReader( + ro, TableReaderOptions(ioptions_, prefix_extractor, file_options, internal_comparator, skip_filters, immortal_tables_, false /* force_direct_prefetch */, level, @@ -141,7 +150,8 @@ void TableCache::EraseHandle(const FileDescriptor& fd, Cache::Handle* handle) { cache_->Erase(key); } -Status TableCache::FindTable(const FileOptions& file_options, +Status TableCache::FindTable(const ReadOptions& ro, + const FileOptions& file_options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, Cache::Handle** handle, const SliceTransform* prefix_extractor, @@ -169,7 +179,7 @@ Status TableCache::FindTable(const FileOptions& file_options, } std::unique_ptr table_reader; - s = GetTableReader(file_options, internal_comparator, fd, + s = GetTableReader(ro, file_options, internal_comparator, fd, false /* sequential mode */, record_read_stats, file_read_hist, &table_reader, prefix_extractor, skip_filters, level, prefetch_index_and_filter_in_cache, @@ -212,12 +222,12 @@ InternalIterator* TableCache::NewIterator( auto& fd = file_meta.fd; table_reader = fd.table_reader; if (table_reader == nullptr) { - s = FindTable(file_options, icomparator, fd, &handle, prefix_extractor, - options.read_tier == kBlockCacheTier /* no_io */, - !for_compaction /* record_read_stats */, file_read_hist, - skip_filters, level, - true /* prefetch_index_and_filter_in_cache */, - max_file_size_for_l0_meta_pin); + s = FindTable( + options, file_options, icomparator, fd, &handle, prefix_extractor, + options.read_tier == kBlockCacheTier /* no_io */, + !for_compaction /* record_read_stats */, file_read_hist, skip_filters, + level, true /* prefetch_index_and_filter_in_cache */, + max_file_size_for_l0_meta_pin); if (s.ok()) { table_reader = GetTableReaderFromHandle(handle); } @@ -288,7 +298,7 @@ Status TableCache::GetRangeTombstoneIterator( TableReader* t = fd.table_reader; Cache::Handle* handle = nullptr; if (t == nullptr) { - s = FindTable(file_options_, internal_comparator, fd, &handle); + s = FindTable(options, file_options_, internal_comparator, fd, &handle); if (s.ok()) { t = GetTableReaderFromHandle(handle); } @@ -403,7 +413,7 @@ Status TableCache::Get(const ReadOptions& options, Cache::Handle* handle = nullptr; if (!done && s.ok()) { if (t == nullptr) { - s = FindTable(file_options_, internal_comparator, fd, &handle, + s = FindTable(options, file_options_, internal_comparator, fd, &handle, prefix_extractor, options.read_tier == kBlockCacheTier /* no_io */, true /* record_read_stats */, file_read_hist, skip_filters, @@ -506,8 +516,8 @@ Status TableCache::MultiGet(const ReadOptions& options, if (s.ok() && !table_range.empty()) { if (t == nullptr) { s = FindTable( - file_options_, internal_comparator, fd, &handle, prefix_extractor, - options.read_tier == kBlockCacheTier /* no_io */, + options, file_options_, internal_comparator, fd, &handle, + prefix_extractor, options.read_tier == kBlockCacheTier /* no_io */, true /* record_read_stats */, file_read_hist, skip_filters, level); TEST_SYNC_POINT_CALLBACK("TableCache::MultiGet:FindTable", &s); if (s.ok()) { @@ -591,8 +601,8 @@ Status TableCache::GetTableProperties( } Cache::Handle* table_handle = nullptr; - s = FindTable(file_options, internal_comparator, fd, &table_handle, - prefix_extractor, no_io); + s = FindTable(ReadOptions(), file_options, internal_comparator, fd, + &table_handle, prefix_extractor, no_io); if (!s.ok()) { return s; } @@ -615,8 +625,8 @@ size_t TableCache::GetMemoryUsageByTableReader( } Cache::Handle* table_handle = nullptr; - s = FindTable(file_options, internal_comparator, fd, &table_handle, - prefix_extractor, true); + s = FindTable(ReadOptions(), file_options, internal_comparator, fd, + &table_handle, prefix_extractor, true); if (!s.ok()) { return 0; } @@ -640,8 +650,8 @@ uint64_t TableCache::ApproximateOffsetOf( Cache::Handle* table_handle = nullptr; if (table_reader == nullptr) { const bool for_compaction = (caller == TableReaderCaller::kCompaction); - Status s = FindTable(file_options_, internal_comparator, fd, &table_handle, - prefix_extractor, false /* no_io */, + Status s = FindTable(ReadOptions(), file_options_, internal_comparator, fd, + &table_handle, prefix_extractor, false /* no_io */, !for_compaction /* record_read_stats */); if (s.ok()) { table_reader = GetTableReaderFromHandle(table_handle); @@ -667,8 +677,8 @@ uint64_t TableCache::ApproximateSize( Cache::Handle* table_handle = nullptr; if (table_reader == nullptr) { const bool for_compaction = (caller == TableReaderCaller::kCompaction); - Status s = FindTable(file_options_, internal_comparator, fd, &table_handle, - prefix_extractor, false /* no_io */, + Status s = FindTable(ReadOptions(), file_options_, internal_comparator, fd, + &table_handle, prefix_extractor, false /* no_io */, !for_compaction /* record_read_stats */); if (s.ok()) { table_reader = GetTableReaderFromHandle(table_handle); diff --git a/db/table_cache.h b/db/table_cache.h index 35b432c6b..5c5ce1e78 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -129,7 +129,7 @@ class TableCache { // Find table reader // @param skip_filters Disables loading/accessing the filter block // @param level == -1 means not specified - Status FindTable(const FileOptions& toptions, + Status FindTable(const ReadOptions& ro, const FileOptions& toptions, const InternalKeyComparator& internal_comparator, const FileDescriptor& file_fd, Cache::Handle**, const SliceTransform* prefix_extractor = nullptr, @@ -195,7 +195,7 @@ class TableCache { private: // Build a table reader - Status GetTableReader(const FileOptions& file_options, + Status GetTableReader(const ReadOptions& ro, const FileOptions& file_options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist, diff --git a/db/version_builder.cc b/db/version_builder.cc index b94dc9320..da746788e 100644 --- a/db/version_builder.cc +++ b/db/version_builder.cc @@ -931,9 +931,10 @@ class VersionBuilder::Rep { auto* file_meta = files_meta[file_idx].first; int level = files_meta[file_idx].second; statuses[file_idx] = table_cache_->FindTable( - file_options_, *(base_vstorage_->InternalComparator()), - file_meta->fd, &file_meta->table_reader_handle, prefix_extractor, - false /*no_io */, true /* record_read_stats */, + ReadOptions(), file_options_, + *(base_vstorage_->InternalComparator()), file_meta->fd, + &file_meta->table_reader_handle, prefix_extractor, false /*no_io */, + true /* record_read_stats */, internal_stats->GetFileReadHist(level), false, level, prefetch_index_and_filter_in_cache, max_file_size_for_l0_meta_pin); if (file_meta->table_reader_handle != nullptr) { diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index 5352417ad..bccc59fc0 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -21,7 +21,8 @@ #include "util/rate_limiter.h" namespace ROCKSDB_NAMESPACE { -Status FilePrefetchBuffer::Prefetch(RandomAccessFileReader* reader, +Status FilePrefetchBuffer::Prefetch(const IOOptions& opts, + RandomAccessFileReader* reader, uint64_t offset, size_t n, bool for_compaction) { if (!enable_ || reader == nullptr) { @@ -87,7 +88,7 @@ Status FilePrefetchBuffer::Prefetch(RandomAccessFileReader* reader, Slice result; size_t read_len = static_cast(roundup_len - chunk_len); - s = reader->Read(IOOptions(), rounddown_offset + chunk_len, read_len, &result, + s = reader->Read(opts, rounddown_offset + chunk_len, read_len, &result, buffer_.BufferStart() + chunk_len, nullptr, for_compaction); #ifndef NDEBUG if (!s.ok() || result.size() < read_len) { @@ -103,7 +104,8 @@ Status FilePrefetchBuffer::Prefetch(RandomAccessFileReader* reader, return s; } -bool FilePrefetchBuffer::TryReadFromCache(uint64_t offset, size_t n, +bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts, + uint64_t offset, size_t n, Slice* result, bool for_compaction) { if (track_min_offset_ && offset < min_offset_read_) { min_offset_read_ = static_cast(offset); @@ -122,10 +124,11 @@ bool FilePrefetchBuffer::TryReadFromCache(uint64_t offset, size_t n, assert(max_readahead_size_ >= readahead_size_); Status s; if (for_compaction) { - s = Prefetch(file_reader_, offset, std::max(n, readahead_size_), + s = Prefetch(opts, file_reader_, offset, std::max(n, readahead_size_), for_compaction); } else { - s = Prefetch(file_reader_, offset, n + readahead_size_, for_compaction); + s = Prefetch(opts, file_reader_, offset, n + readahead_size_, + for_compaction); } if (!s.ok()) { return false; diff --git a/file/file_prefetch_buffer.h b/file/file_prefetch_buffer.h index d53f627b5..d8e9c0ff6 100644 --- a/file/file_prefetch_buffer.h +++ b/file/file_prefetch_buffer.h @@ -11,9 +11,11 @@ #include #include #include + #include "file/random_access_file_reader.h" #include "port/port.h" #include "rocksdb/env.h" +#include "rocksdb/options.h" #include "util/aligned_buffer.h" namespace ROCKSDB_NAMESPACE { @@ -59,8 +61,8 @@ class FilePrefetchBuffer { // offset : the file offset to start reading from. // n : the number of bytes to read. // for_compaction : if prefetch is done for compaction read. - Status Prefetch(RandomAccessFileReader* reader, uint64_t offset, size_t n, - bool for_compaction = false); + Status Prefetch(const IOOptions& opts, RandomAccessFileReader* reader, + uint64_t offset, size_t n, bool for_compaction = false); // Tries returning the data for a file raed from this buffer, if that data is // in the buffer. @@ -72,8 +74,8 @@ class FilePrefetchBuffer { // n : the number of bytes. // result : output buffer to put the data into. // for_compaction : if cache read is done for compaction read. - bool TryReadFromCache(uint64_t offset, size_t n, Slice* result, - bool for_compaction = false); + bool TryReadFromCache(const IOOptions& opts, uint64_t offset, size_t n, + Slice* result, bool for_compaction = false); // The minimum `offset` ever passed to TryReadFromCache(). This will nly be // tracked if track_min_offset = true. diff --git a/file/file_util.cc b/file/file_util.cc index 603b22937..70178a0bd 100644 --- a/file/file_util.cc +++ b/file/file_util.cc @@ -166,10 +166,11 @@ IOStatus GenerateOneFileChecksum(FileSystem* fs, const std::string& file_path, Slice slice; uint64_t offset = 0; + IOOptions opts; while (size > 0) { size_t bytes_to_read = static_cast(std::min(uint64_t{readahead_size}, size)); - if (!prefetch_buffer.TryReadFromCache(offset, bytes_to_read, &slice, + if (!prefetch_buffer.TryReadFromCache(opts, offset, bytes_to_read, &slice, false)) { return IOStatus::Corruption("file read failed"); } diff --git a/file/random_access_file_reader.h b/file/random_access_file_reader.h index f1c9e7018..f0f6801f6 100644 --- a/file/random_access_file_reader.h +++ b/file/random_access_file_reader.h @@ -11,10 +11,12 @@ #include #include #include + #include "port/port.h" #include "rocksdb/env.h" #include "rocksdb/file_system.h" #include "rocksdb/listener.h" +#include "rocksdb/options.h" #include "rocksdb/rate_limiter.h" #include "util/aligned_buffer.h" diff --git a/include/rocksdb/table.h b/include/rocksdb/table.h index adec1aa4c..95cdb8d21 100644 --- a/include/rocksdb/table.h +++ b/include/rocksdb/table.h @@ -556,7 +556,19 @@ class TableFactory { const TableReaderOptions& table_reader_options, std::unique_ptr&& file, uint64_t file_size, std::unique_ptr* table_reader, - bool prefetch_index_and_filter_in_cache = true) const = 0; + bool prefetch_index_and_filter_in_cache = true) const { + ReadOptions ro; + return NewTableReader(ro, table_reader_options, std::move(file), file_size, + table_reader, prefetch_index_and_filter_in_cache); + } + + // Overload of the above function that allows the caller to pass in a + // ReadOptions + virtual Status NewTableReader( + const ReadOptions& ro, const TableReaderOptions& table_reader_options, + std::unique_ptr&& file, uint64_t file_size, + std::unique_ptr* table_reader, + bool prefetch_index_and_filter_in_cache) const = 0; // Return a table builder to write to a file for this table type. // diff --git a/table/adaptive/adaptive_table_factory.cc b/table/adaptive/adaptive_table_factory.cc index fa94e7bcd..98381ee81 100644 --- a/table/adaptive/adaptive_table_factory.cc +++ b/table/adaptive/adaptive_table_factory.cc @@ -42,12 +42,13 @@ extern const uint64_t kLegacyBlockBasedTableMagicNumber; extern const uint64_t kCuckooTableMagicNumber; Status AdaptiveTableFactory::NewTableReader( - const TableReaderOptions& table_reader_options, + const ReadOptions& ro, const TableReaderOptions& table_reader_options, std::unique_ptr&& file, uint64_t file_size, std::unique_ptr* table, - bool /*prefetch_index_and_filter_in_cache*/) const { + bool prefetch_index_and_filter_in_cache) const { Footer footer; - auto s = ReadFooterFromFile(file.get(), nullptr /* prefetch_buffer */, + IOOptions opts; + auto s = ReadFooterFromFile(opts, file.get(), nullptr /* prefetch_buffer */, file_size, &footer); if (!s.ok()) { return s; @@ -59,7 +60,8 @@ Status AdaptiveTableFactory::NewTableReader( } else if (footer.table_magic_number() == kBlockBasedTableMagicNumber || footer.table_magic_number() == kLegacyBlockBasedTableMagicNumber) { return block_based_table_factory_->NewTableReader( - table_reader_options, std::move(file), file_size, table); + ro, table_reader_options, std::move(file), file_size, table, + prefetch_index_and_filter_in_cache); } else if (footer.table_magic_number() == kCuckooTableMagicNumber) { return cuckoo_table_factory_->NewTableReader( table_reader_options, std::move(file), file_size, table); diff --git a/table/adaptive/adaptive_table_factory.h b/table/adaptive/adaptive_table_factory.h index fcc4c682c..74d10dba0 100644 --- a/table/adaptive/adaptive_table_factory.h +++ b/table/adaptive/adaptive_table_factory.h @@ -33,8 +33,9 @@ class AdaptiveTableFactory : public TableFactory { const char* Name() const override { return "AdaptiveTableFactory"; } + using TableFactory::NewTableReader; Status NewTableReader( - const TableReaderOptions& table_reader_options, + const ReadOptions& ro, const TableReaderOptions& table_reader_options, std::unique_ptr&& file, uint64_t file_size, std::unique_ptr* table, bool prefetch_index_and_filter_in_cache = true) const override; diff --git a/table/block_based/binary_search_index_reader.cc b/table/block_based/binary_search_index_reader.cc index 8c938c924..8a2b72963 100644 --- a/table/block_based/binary_search_index_reader.cc +++ b/table/block_based/binary_search_index_reader.cc @@ -10,9 +10,9 @@ namespace ROCKSDB_NAMESPACE { Status BinarySearchIndexReader::Create( - const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer, - bool use_cache, bool prefetch, bool pin, - BlockCacheLookupContext* lookup_context, + const BlockBasedTable* table, const ReadOptions& ro, + FilePrefetchBuffer* prefetch_buffer, bool use_cache, bool prefetch, + bool pin, BlockCacheLookupContext* lookup_context, std::unique_ptr* index_reader) { assert(table != nullptr); assert(table->get_rep()); @@ -22,7 +22,7 @@ Status BinarySearchIndexReader::Create( CachableEntry index_block; if (prefetch || !use_cache) { const Status s = - ReadIndexBlock(table, prefetch_buffer, ReadOptions(), use_cache, + ReadIndexBlock(table, prefetch_buffer, ro, use_cache, /*get_context=*/nullptr, lookup_context, &index_block); if (!s.ok()) { return s; diff --git a/table/block_based/binary_search_index_reader.h b/table/block_based/binary_search_index_reader.h index e8a05d51e..d4a611ecc 100644 --- a/table/block_based/binary_search_index_reader.h +++ b/table/block_based/binary_search_index_reader.h @@ -19,7 +19,7 @@ class BinarySearchIndexReader : public BlockBasedTable::IndexReaderCommon { // `BinarySearchIndexReader`. // On success, index_reader will be populated; otherwise it will remain // unmodified. - static Status Create(const BlockBasedTable* table, + static Status Create(const BlockBasedTable* table, const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer, bool use_cache, bool prefetch, bool pin, BlockCacheLookupContext* lookup_context, diff --git a/table/block_based/block_based_filter_block.cc b/table/block_based/block_based_filter_block.cc index ea1d5f9c7..2e457e32f 100644 --- a/table/block_based/block_based_filter_block.cc +++ b/table/block_based/block_based_filter_block.cc @@ -171,18 +171,18 @@ BlockBasedFilterBlockReader::BlockBasedFilterBlockReader( } std::unique_ptr BlockBasedFilterBlockReader::Create( - const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer, - bool use_cache, bool prefetch, bool pin, - BlockCacheLookupContext* lookup_context) { + const BlockBasedTable* table, const ReadOptions& ro, + FilePrefetchBuffer* prefetch_buffer, bool use_cache, bool prefetch, + bool pin, BlockCacheLookupContext* lookup_context) { assert(table); assert(table->get_rep()); assert(!pin || prefetch); CachableEntry filter_block; if (prefetch || !use_cache) { - const Status s = ReadFilterBlock(table, prefetch_buffer, ReadOptions(), - use_cache, nullptr /* get_context */, - lookup_context, &filter_block); + const Status s = ReadFilterBlock(table, prefetch_buffer, ro, use_cache, + nullptr /* get_context */, lookup_context, + &filter_block); if (!s.ok()) { IGNORE_STATUS_IF_ERROR(s); return std::unique_ptr(); diff --git a/table/block_based/block_based_filter_block.h b/table/block_based/block_based_filter_block.h index 01c98a70b..67ded1ee3 100644 --- a/table/block_based/block_based_filter_block.h +++ b/table/block_based/block_based_filter_block.h @@ -85,9 +85,9 @@ class BlockBasedFilterBlockReader void operator=(const BlockBasedFilterBlockReader&) = delete; static std::unique_ptr Create( - const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer, - bool use_cache, bool prefetch, bool pin, - BlockCacheLookupContext* lookup_context); + const BlockBasedTable* table, const ReadOptions& ro, + FilePrefetchBuffer* prefetch_buffer, bool use_cache, bool prefetch, + bool pin, BlockCacheLookupContext* lookup_context); bool IsBlockBased() override { return true; } diff --git a/table/block_based/block_based_table_factory.cc b/table/block_based/block_based_table_factory.cc index 18dddaf3c..267e6163b 100644 --- a/table/block_based/block_based_table_factory.cc +++ b/table/block_based/block_based_table_factory.cc @@ -412,12 +412,12 @@ BlockBasedTableFactory::BlockBasedTableFactory( } Status BlockBasedTableFactory::NewTableReader( - const TableReaderOptions& table_reader_options, + const ReadOptions& ro, const TableReaderOptions& table_reader_options, std::unique_ptr&& file, uint64_t file_size, std::unique_ptr* table_reader, bool prefetch_index_and_filter_in_cache) const { return BlockBasedTable::Open( - table_reader_options.ioptions, table_reader_options.env_options, + ro, table_reader_options.ioptions, table_reader_options.env_options, table_options_, table_reader_options.internal_comparator, std::move(file), file_size, table_reader, table_reader_options.prefix_extractor, prefetch_index_and_filter_in_cache, table_reader_options.skip_filters, diff --git a/table/block_based/block_based_table_factory.h b/table/block_based/block_based_table_factory.h index b8b125658..4aff7c862 100644 --- a/table/block_based/block_based_table_factory.h +++ b/table/block_based/block_based_table_factory.h @@ -48,8 +48,9 @@ class BlockBasedTableFactory : public TableFactory { const char* Name() const override { return kName.c_str(); } + using TableFactory::NewTableReader; Status NewTableReader( - const TableReaderOptions& table_reader_options, + const ReadOptions& ro, const TableReaderOptions& table_reader_options, std::unique_ptr&& file, uint64_t file_size, std::unique_ptr* table_reader, bool prefetch_index_and_filter_in_cache = true) const override; diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index a6f8bdcaa..a3b9092d5 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -577,8 +577,8 @@ Slice BlockBasedTable::GetCacheKey(const char* cache_key_prefix, } Status BlockBasedTable::Open( - const ImmutableCFOptions& ioptions, const EnvOptions& env_options, - const BlockBasedTableOptions& table_options, + const ReadOptions& read_options, const ImmutableCFOptions& ioptions, + const EnvOptions& env_options, const BlockBasedTableOptions& table_options, const InternalKeyComparator& internal_comparator, std::unique_ptr&& file, uint64_t file_size, std::unique_ptr* table_reader, @@ -595,12 +595,19 @@ Status BlockBasedTable::Open( Footer footer; std::unique_ptr prefetch_buffer; + // Only retain read_options.deadline. In future, we may retain more + // options. Specifically, w ignore verify_checksums and default to + // checksum verification anyway when creating the index and filter + // readers. + ReadOptions ro; + ro.deadline = read_options.deadline; + // prefetch both index and filters, down to all partitions const bool prefetch_all = prefetch_index_and_filter_in_cache || level == 0; const bool preload_all = !table_options.cache_index_and_filter_blocks; if (!ioptions.allow_mmap_reads) { - s = PrefetchTail(file.get(), file_size, force_direct_prefetch, + s = PrefetchTail(ro, file.get(), file_size, force_direct_prefetch, tail_prefetch_stats, prefetch_all, preload_all, &prefetch_buffer); } else { @@ -617,8 +624,12 @@ Status BlockBasedTable::Open( // 5. [meta block: compression dictionary] // 6. [meta block: index] // 7. [meta block: filter] - s = ReadFooterFromFile(file.get(), prefetch_buffer.get(), file_size, &footer, - kBlockBasedTableMagicNumber); + IOOptions opts; + s = PrepareIOFromReadOptions(ro, file->env(), opts); + if (s.ok()) { + s = ReadFooterFromFile(opts, file.get(), prefetch_buffer.get(), file_size, + &footer, kBlockBasedTableMagicNumber); + } if (!s.ok()) { return s; } @@ -664,7 +675,7 @@ Status BlockBasedTable::Open( // Read metaindex std::unique_ptr metaindex; std::unique_ptr metaindex_iter; - s = new_table->ReadMetaIndexBlock(prefetch_buffer.get(), &metaindex, + s = new_table->ReadMetaIndexBlock(ro, prefetch_buffer.get(), &metaindex, &metaindex_iter); if (!s.ok()) { return s; @@ -672,18 +683,19 @@ Status BlockBasedTable::Open( // Populates table_properties and some fields that depend on it, // such as index_type. - s = new_table->ReadPropertiesBlock(prefetch_buffer.get(), + s = new_table->ReadPropertiesBlock(ro, prefetch_buffer.get(), metaindex_iter.get(), largest_seqno); if (!s.ok()) { return s; } - s = new_table->ReadRangeDelBlock(prefetch_buffer.get(), metaindex_iter.get(), - internal_comparator, &lookup_context); + s = new_table->ReadRangeDelBlock(ro, prefetch_buffer.get(), + metaindex_iter.get(), internal_comparator, + &lookup_context); if (!s.ok()) { return s; } s = new_table->PrefetchIndexAndFilterBlocks( - prefetch_buffer.get(), metaindex_iter.get(), new_table.get(), + ro, prefetch_buffer.get(), metaindex_iter.get(), new_table.get(), prefetch_all, table_options, level, file_size, max_file_size_for_l0_meta_pin, &lookup_context); @@ -703,7 +715,7 @@ Status BlockBasedTable::Open( } Status BlockBasedTable::PrefetchTail( - RandomAccessFileReader* file, uint64_t file_size, + const ReadOptions& ro, RandomAccessFileReader* file, uint64_t file_size, bool force_direct_prefetch, TailPrefetchStats* tail_prefetch_stats, const bool prefetch_all, const bool preload_all, std::unique_ptr* prefetch_buffer) { @@ -742,15 +754,19 @@ Status BlockBasedTable::PrefetchTail( } else { prefetch_buffer->reset(new FilePrefetchBuffer( nullptr, 0, 0, true /* enable */, true /* track_min_offset */)); - s = (*prefetch_buffer)->Prefetch(file, prefetch_off, prefetch_len); + IOOptions opts; + s = PrepareIOFromReadOptions(ro, file->env(), opts); + if (s.ok()) { + s = (*prefetch_buffer)->Prefetch(opts, file, prefetch_off, prefetch_len); + } } return s; } Status BlockBasedTable::TryReadPropertiesWithGlobalSeqno( - FilePrefetchBuffer* prefetch_buffer, const Slice& handle_value, - TableProperties** table_properties) { + const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer, + const Slice& handle_value, TableProperties** table_properties) { assert(table_properties != nullptr); // If this is an external SST file ingested with write_global_seqno set to // true, then we expect the checksum mismatch because checksum was written @@ -760,7 +776,7 @@ Status BlockBasedTable::TryReadPropertiesWithGlobalSeqno( // original value, i.e. 0, and verify the checksum again. BlockHandle props_block_handle; CacheAllocationPtr tmp_buf; - Status s = ReadProperties(handle_value, rep_->file.get(), prefetch_buffer, + Status s = ReadProperties(ro, handle_value, rep_->file.get(), prefetch_buffer, rep_->footer, rep_->ioptions, table_properties, false /* verify_checksum */, &props_block_handle, &tmp_buf, false /* compression_type_missing */, @@ -784,8 +800,8 @@ Status BlockBasedTable::TryReadPropertiesWithGlobalSeqno( } Status BlockBasedTable::ReadPropertiesBlock( - FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter, - const SequenceNumber largest_seqno) { + const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer, + InternalIterator* meta_iter, const SequenceNumber largest_seqno) { bool found_properties_block = true; Status s; s = SeekToPropertiesBlock(meta_iter, &found_properties_block); @@ -799,16 +815,17 @@ Status BlockBasedTable::ReadPropertiesBlock( TableProperties* table_properties = nullptr; if (s.ok()) { s = ReadProperties( - meta_iter->value(), rep_->file.get(), prefetch_buffer, rep_->footer, - rep_->ioptions, &table_properties, true /* verify_checksum */, - nullptr /* ret_block_handle */, nullptr /* ret_block_contents */, + ro, meta_iter->value(), rep_->file.get(), prefetch_buffer, + rep_->footer, rep_->ioptions, &table_properties, + true /* verify_checksum */, nullptr /* ret_block_handle */, + nullptr /* ret_block_contents */, false /* compression_type_missing */, nullptr /* memory_allocator */); } IGNORE_STATUS_IF_ERROR(s); if (s.IsCorruption()) { - s = TryReadPropertiesWithGlobalSeqno(prefetch_buffer, meta_iter->value(), - &table_properties); + s = TryReadPropertiesWithGlobalSeqno( + ro, prefetch_buffer, meta_iter->value(), &table_properties); IGNORE_STATUS_IF_ERROR(s); } std::unique_ptr props_guard; @@ -883,7 +900,8 @@ Status BlockBasedTable::ReadPropertiesBlock( } Status BlockBasedTable::ReadRangeDelBlock( - FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter, + const ReadOptions& read_options, FilePrefetchBuffer* prefetch_buffer, + InternalIterator* meta_iter, const InternalKeyComparator& internal_comparator, BlockCacheLookupContext* lookup_context) { Status s; @@ -896,7 +914,6 @@ Status BlockBasedTable::ReadRangeDelBlock( "Error when seeking to range delete tombstones block from file: %s", s.ToString().c_str()); } else if (found_range_del_block && !range_del_handle.IsNull()) { - ReadOptions read_options; std::unique_ptr iter(NewDataBlockIterator( read_options, range_del_handle, /*input_iter=*/nullptr, BlockType::kRangeDeletion, @@ -919,8 +936,8 @@ Status BlockBasedTable::ReadRangeDelBlock( } Status BlockBasedTable::PrefetchIndexAndFilterBlocks( - FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter, - BlockBasedTable* new_table, bool prefetch_all, + const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer, + InternalIterator* meta_iter, BlockBasedTable* new_table, bool prefetch_all, const BlockBasedTableOptions& table_options, const int level, size_t file_size, size_t max_file_size_for_l0_meta_pin, BlockCacheLookupContext* lookup_context) { @@ -983,7 +1000,7 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks( index_type == BlockBasedTableOptions::kTwoLevelIndexSearch); std::unique_ptr index_reader; - s = new_table->CreateIndexReader(prefetch_buffer, meta_iter, use_cache, + s = new_table->CreateIndexReader(ro, prefetch_buffer, meta_iter, use_cache, prefetch_index, pin_index, lookup_context, &index_reader); if (!s.ok()) { @@ -996,7 +1013,7 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks( // are hence follow the configuration for pin and prefetch regardless of // the value of cache_index_and_filter_blocks if (prefetch_all) { - rep_->index_reader->CacheDependencies(pin_all); + rep_->index_reader->CacheDependencies(ro, pin_all); } // prefetch the first level of filter @@ -1013,12 +1030,12 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks( if (rep_->filter_policy) { auto filter = new_table->CreateFilterBlockReader( - prefetch_buffer, use_cache, prefetch_filter, pin_filter, + ro, prefetch_buffer, use_cache, prefetch_filter, pin_filter, lookup_context); if (filter) { // Refer to the comment above about paritioned indexes always being cached if (prefetch_all) { - filter->CacheDependencies(pin_all); + filter->CacheDependencies(ro, pin_all); } rep_->filter = std::move(filter); @@ -1027,7 +1044,7 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks( if (!rep_->compression_dict_handle.IsNull()) { std::unique_ptr uncompression_dict_reader; - s = UncompressionDictReader::Create(this, prefetch_buffer, use_cache, + s = UncompressionDictReader::Create(this, ro, prefetch_buffer, use_cache, prefetch_all, pin_all, lookup_context, &uncompression_dict_reader); if (!s.ok()) { @@ -1082,14 +1099,14 @@ size_t BlockBasedTable::ApproximateMemoryUsage() const { // metaindex // block and its iterator. Status BlockBasedTable::ReadMetaIndexBlock( - FilePrefetchBuffer* prefetch_buffer, + const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer, std::unique_ptr* metaindex_block, std::unique_ptr* iter) { // TODO(sanjay): Skip this if footer.metaindex_handle() size indicates // it is an empty block. std::unique_ptr metaindex; Status s = ReadBlockFromFile( - rep_->file.get(), prefetch_buffer, rep_->footer, ReadOptions(), + rep_->file.get(), prefetch_buffer, rep_->footer, ro, rep_->footer.metaindex_handle(), &metaindex, rep_->ioptions, true /* decompress */, true /*maybe_compressed*/, BlockType::kMetaIndex, UncompressionDict::GetEmptyDict(), rep_->persistent_cache_options, @@ -1317,8 +1334,8 @@ Status BlockBasedTable::PutDataBlockToCache( } std::unique_ptr BlockBasedTable::CreateFilterBlockReader( - FilePrefetchBuffer* prefetch_buffer, bool use_cache, bool prefetch, - bool pin, BlockCacheLookupContext* lookup_context) { + const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer, bool use_cache, + bool prefetch, bool pin, BlockCacheLookupContext* lookup_context) { auto& rep = rep_; auto filter_type = rep->filter_type; if (filter_type == Rep::FilterType::kNoFilter) { @@ -1330,14 +1347,14 @@ std::unique_ptr BlockBasedTable::CreateFilterBlockReader( switch (filter_type) { case Rep::FilterType::kPartitionedFilter: return PartitionedFilterBlockReader::Create( - this, prefetch_buffer, use_cache, prefetch, pin, lookup_context); + this, ro, prefetch_buffer, use_cache, prefetch, pin, lookup_context); case Rep::FilterType::kBlockFilter: return BlockBasedFilterBlockReader::Create( - this, prefetch_buffer, use_cache, prefetch, pin, lookup_context); + this, ro, prefetch_buffer, use_cache, prefetch, pin, lookup_context); case Rep::FilterType::kFullFilter: - return FullFilterBlockReader::Create(this, prefetch_buffer, use_cache, + return FullFilterBlockReader::Create(this, ro, prefetch_buffer, use_cache, prefetch, pin, lookup_context); default: @@ -2205,9 +2222,11 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, lookup_context.get_from_user_specified_snapshot = read_options.snapshot != nullptr; } + TEST_SYNC_POINT("BlockBasedTable::Get:BeforeFilterMatch"); const bool may_match = FullFilterKeyMayMatch(read_options, filter, key, no_io, prefix_extractor, get_context, &lookup_context); + TEST_SYNC_POINT("BlockBasedTable::Get:AfterFilterMatch"); if (!may_match) { RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_USEFUL); PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_useful, 1, rep_->level); @@ -2754,7 +2773,8 @@ Status BlockBasedTable::VerifyChecksum(const ReadOptions& read_options, // Check Meta blocks std::unique_ptr metaindex; std::unique_ptr metaindex_iter; - s = ReadMetaIndexBlock(nullptr /* prefetch buffer */, &metaindex, + ReadOptions ro; + s = ReadMetaIndexBlock(ro, nullptr /* prefetch buffer */, &metaindex, &metaindex_iter); if (s.ok()) { s = VerifyChecksumInMetaBlocks(metaindex_iter.get()); @@ -2878,7 +2898,8 @@ Status BlockBasedTable::VerifyChecksumInMetaBlocks( s = block_fetcher.ReadBlockContents(); if (s.IsCorruption() && meta_block_name == kPropertiesBlock) { TableProperties* table_properties; - s = TryReadPropertiesWithGlobalSeqno(nullptr /* prefetch_buffer */, + ReadOptions ro; + s = TryReadPropertiesWithGlobalSeqno(ro, nullptr /* prefetch_buffer */, index_iter->value(), &table_properties); delete table_properties; @@ -2931,7 +2952,7 @@ bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options, // 4. internal_comparator // 5. index_type Status BlockBasedTable::CreateIndexReader( - FilePrefetchBuffer* prefetch_buffer, + const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer, InternalIterator* preloaded_meta_index_iter, bool use_cache, bool prefetch, bool pin, BlockCacheLookupContext* lookup_context, std::unique_ptr* index_reader) { @@ -2943,16 +2964,16 @@ Status BlockBasedTable::CreateIndexReader( switch (rep_->index_type) { case BlockBasedTableOptions::kTwoLevelIndexSearch: { - return PartitionIndexReader::Create(this, prefetch_buffer, use_cache, + return PartitionIndexReader::Create(this, ro, prefetch_buffer, use_cache, prefetch, pin, lookup_context, index_reader); } case BlockBasedTableOptions::kBinarySearch: FALLTHROUGH_INTENDED; case BlockBasedTableOptions::kBinarySearchWithFirstKey: { - return BinarySearchIndexReader::Create(this, prefetch_buffer, use_cache, - prefetch, pin, lookup_context, - index_reader); + return BinarySearchIndexReader::Create(this, ro, prefetch_buffer, + use_cache, prefetch, pin, + lookup_context, index_reader); } case BlockBasedTableOptions::kHashSearch: { std::unique_ptr metaindex_guard; @@ -2965,7 +2986,7 @@ Status BlockBasedTable::CreateIndexReader( " search index."); should_fallback = true; } else if (meta_index_iter == nullptr) { - auto s = ReadMetaIndexBlock(prefetch_buffer, &metaindex_guard, + auto s = ReadMetaIndexBlock(ro, prefetch_buffer, &metaindex_guard, &metaindex_iter_guard); if (!s.ok()) { // we simply fall back to binary search in case there is any @@ -2979,13 +3000,13 @@ Status BlockBasedTable::CreateIndexReader( } if (should_fallback) { - return BinarySearchIndexReader::Create(this, prefetch_buffer, use_cache, - prefetch, pin, lookup_context, - index_reader); + return BinarySearchIndexReader::Create(this, ro, prefetch_buffer, + use_cache, prefetch, pin, + lookup_context, index_reader); } else { - return HashIndexReader::Create(this, prefetch_buffer, meta_index_iter, - use_cache, prefetch, pin, lookup_context, - index_reader); + return HashIndexReader::Create(this, ro, prefetch_buffer, + meta_index_iter, use_cache, prefetch, + pin, lookup_context, index_reader); } } default: { @@ -3170,7 +3191,8 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) { "--------------------------------------\n"); std::unique_ptr metaindex; std::unique_ptr metaindex_iter; - Status s = ReadMetaIndexBlock(nullptr /* prefetch_buffer */, &metaindex, + ReadOptions ro; + Status s = ReadMetaIndexBlock(ro, nullptr /* prefetch_buffer */, &metaindex, &metaindex_iter); if (s.ok()) { for (metaindex_iter->SeekToFirst(); metaindex_iter->Valid(); diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index 87bf82e53..889a4a29f 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -87,7 +87,7 @@ class BlockBasedTable : public TableReader { // are set. // @param force_direct_prefetch if true, always prefetching to RocksDB // buffer, rather than calling RandomAccessFile::Prefetch(). - static Status Open(const ImmutableCFOptions& ioptions, + static Status Open(const ReadOptions& ro, const ImmutableCFOptions& ioptions, const EnvOptions& env_options, const BlockBasedTableOptions& table_options, const InternalKeyComparator& internal_key_comparator, @@ -205,7 +205,7 @@ class BlockBasedTable : public TableReader { virtual size_t ApproximateMemoryUsage() const = 0; // Cache the dependencies of the index reader (e.g. the partitions // of a partitioned index). - virtual void CacheDependencies(bool /* pin */) {} + virtual void CacheDependencies(const ReadOptions& /*ro*/, bool /* pin */) {} }; class IndexReaderCommon; @@ -379,7 +379,8 @@ class BlockBasedTable : public TableReader { // Optionally, user can pass a preloaded meta_index_iter for the index that // need to access extra meta blocks for index construction. This parameter // helps avoid re-reading meta index block if caller already created one. - Status CreateIndexReader(FilePrefetchBuffer* prefetch_buffer, + Status CreateIndexReader(const ReadOptions& ro, + FilePrefetchBuffer* prefetch_buffer, InternalIterator* preloaded_meta_index_iter, bool use_cache, bool prefetch, bool pin, BlockCacheLookupContext* lookup_context, @@ -401,28 +402,32 @@ class BlockBasedTable : public TableReader { // If force_direct_prefetch is true, always prefetching to RocksDB // buffer, rather than calling RandomAccessFile::Prefetch(). static Status PrefetchTail( - RandomAccessFileReader* file, uint64_t file_size, + const ReadOptions& ro, RandomAccessFileReader* file, uint64_t file_size, bool force_direct_prefetch, TailPrefetchStats* tail_prefetch_stats, const bool prefetch_all, const bool preload_all, std::unique_ptr* prefetch_buffer); - Status ReadMetaIndexBlock(FilePrefetchBuffer* prefetch_buffer, + Status ReadMetaIndexBlock(const ReadOptions& ro, + FilePrefetchBuffer* prefetch_buffer, std::unique_ptr* metaindex_block, std::unique_ptr* iter); - Status TryReadPropertiesWithGlobalSeqno(FilePrefetchBuffer* prefetch_buffer, + Status TryReadPropertiesWithGlobalSeqno(const ReadOptions& ro, + FilePrefetchBuffer* prefetch_buffer, const Slice& handle_value, TableProperties** table_properties); - Status ReadPropertiesBlock(FilePrefetchBuffer* prefetch_buffer, + Status ReadPropertiesBlock(const ReadOptions& ro, + FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter, const SequenceNumber largest_seqno); - Status ReadRangeDelBlock(FilePrefetchBuffer* prefetch_buffer, + Status ReadRangeDelBlock(const ReadOptions& ro, + FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter, const InternalKeyComparator& internal_comparator, BlockCacheLookupContext* lookup_context); Status PrefetchIndexAndFilterBlocks( - FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter, - BlockBasedTable* new_table, bool prefetch_all, - const BlockBasedTableOptions& table_options, const int level, - size_t file_size, size_t max_file_size_for_l0_meta_pin, + const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer, + InternalIterator* meta_iter, BlockBasedTable* new_table, + bool prefetch_all, const BlockBasedTableOptions& table_options, + const int level, size_t file_size, size_t max_file_size_for_l0_meta_pin, BlockCacheLookupContext* lookup_context); static BlockType GetBlockTypeForMetaBlockByName(const Slice& meta_block_name); @@ -433,8 +438,9 @@ class BlockBasedTable : public TableReader { // Create the filter from the filter block. std::unique_ptr CreateFilterBlockReader( - FilePrefetchBuffer* prefetch_buffer, bool use_cache, bool prefetch, - bool pin, BlockCacheLookupContext* lookup_context); + const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer, + bool use_cache, bool prefetch, bool pin, + BlockCacheLookupContext* lookup_context); static void SetupCacheKeyPrefix(Rep* rep); diff --git a/table/block_based/block_based_table_reader_test.cc b/table/block_based/block_based_table_reader_test.cc index 194010fcf..6062b3d4e 100644 --- a/table/block_based/block_based_table_reader_test.cc +++ b/table/block_based/block_based_table_reader_test.cc @@ -90,7 +90,8 @@ class BlockBasedTableReaderTest ASSERT_OK(env_->GetFileSize(Path(table_name), &file_size)); std::unique_ptr table_reader; - ASSERT_OK(BlockBasedTable::Open(ioptions, EnvOptions(), + ReadOptions ro; + ASSERT_OK(BlockBasedTable::Open(ro, ioptions, EnvOptions(), table_factory_->table_options(), comparator, std::move(file), file_size, &table_reader)); diff --git a/table/block_based/filter_block.h b/table/block_based/filter_block.h index eb3e92235..d94c7e606 100644 --- a/table/block_based/filter_block.h +++ b/table/block_based/filter_block.h @@ -153,7 +153,7 @@ class FilterBlockReader { return error_msg; } - virtual void CacheDependencies(bool /*pin*/) {} + virtual void CacheDependencies(const ReadOptions& /*ro*/, bool /*pin*/) {} virtual bool RangeMayExist(const Slice* /*iterate_upper_bound*/, const Slice& user_key, diff --git a/table/block_based/full_filter_block.cc b/table/block_based/full_filter_block.cc index ad64fc2a9..a104bec47 100644 --- a/table/block_based/full_filter_block.cc +++ b/table/block_based/full_filter_block.cc @@ -119,18 +119,18 @@ bool FullFilterBlockReader::KeyMayMatch( } std::unique_ptr FullFilterBlockReader::Create( - const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer, - bool use_cache, bool prefetch, bool pin, - BlockCacheLookupContext* lookup_context) { + const BlockBasedTable* table, const ReadOptions& ro, + FilePrefetchBuffer* prefetch_buffer, bool use_cache, bool prefetch, + bool pin, BlockCacheLookupContext* lookup_context) { assert(table); assert(table->get_rep()); assert(!pin || prefetch); CachableEntry filter_block; if (prefetch || !use_cache) { - const Status s = ReadFilterBlock(table, prefetch_buffer, ReadOptions(), - use_cache, nullptr /* get_context */, - lookup_context, &filter_block); + const Status s = ReadFilterBlock(table, prefetch_buffer, ro, use_cache, + nullptr /* get_context */, lookup_context, + &filter_block); if (!s.ok()) { IGNORE_STATUS_IF_ERROR(s); return std::unique_ptr(); diff --git a/table/block_based/full_filter_block.h b/table/block_based/full_filter_block.h index 324b1a826..42f4dbbc3 100644 --- a/table/block_based/full_filter_block.h +++ b/table/block_based/full_filter_block.h @@ -87,9 +87,9 @@ class FullFilterBlockReader CachableEntry&& filter_block); static std::unique_ptr Create( - const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer, - bool use_cache, bool prefetch, bool pin, - BlockCacheLookupContext* lookup_context); + const BlockBasedTable* table, const ReadOptions& ro, + FilePrefetchBuffer* prefetch_buffer, bool use_cache, bool prefetch, + bool pin, BlockCacheLookupContext* lookup_context); bool IsBlockBased() override { return false; } diff --git a/table/block_based/hash_index_reader.cc b/table/block_based/hash_index_reader.cc index c1648bbe1..d15cbd217 100644 --- a/table/block_based/hash_index_reader.cc +++ b/table/block_based/hash_index_reader.cc @@ -13,6 +13,7 @@ namespace ROCKSDB_NAMESPACE { Status HashIndexReader::Create(const BlockBasedTable* table, + const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_index_iter, bool use_cache, bool prefetch, bool pin, @@ -28,7 +29,7 @@ Status HashIndexReader::Create(const BlockBasedTable* table, CachableEntry index_block; if (prefetch || !use_cache) { const Status s = - ReadIndexBlock(table, prefetch_buffer, ReadOptions(), use_cache, + ReadIndexBlock(table, prefetch_buffer, ro, use_cache, /*get_context=*/nullptr, lookup_context, &index_block); if (!s.ok()) { return s; diff --git a/table/block_based/hash_index_reader.h b/table/block_based/hash_index_reader.h index fecd1e5c8..9037efc87 100644 --- a/table/block_based/hash_index_reader.h +++ b/table/block_based/hash_index_reader.h @@ -15,7 +15,7 @@ namespace ROCKSDB_NAMESPACE { // key. class HashIndexReader : public BlockBasedTable::IndexReaderCommon { public: - static Status Create(const BlockBasedTable* table, + static Status Create(const BlockBasedTable* table, const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_index_iter, bool use_cache, bool prefetch, bool pin, diff --git a/table/block_based/partitioned_filter_block.cc b/table/block_based/partitioned_filter_block.cc index d59756024..bdf9250db 100644 --- a/table/block_based/partitioned_filter_block.cc +++ b/table/block_based/partitioned_filter_block.cc @@ -7,6 +7,7 @@ #include +#include "file/file_util.h" #include "monitoring/perf_context_imp.h" #include "port/malloc.h" #include "port/port.h" @@ -149,18 +150,18 @@ PartitionedFilterBlockReader::PartitionedFilterBlockReader( : FilterBlockReaderCommon(t, std::move(filter_block)) {} std::unique_ptr PartitionedFilterBlockReader::Create( - const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer, - bool use_cache, bool prefetch, bool pin, - BlockCacheLookupContext* lookup_context) { + const BlockBasedTable* table, const ReadOptions& ro, + FilePrefetchBuffer* prefetch_buffer, bool use_cache, bool prefetch, + bool pin, BlockCacheLookupContext* lookup_context) { assert(table); assert(table->get_rep()); assert(!pin || prefetch); CachableEntry filter_block; if (prefetch || !use_cache) { - const Status s = ReadFilterBlock(table, prefetch_buffer, ReadOptions(), - use_cache, nullptr /* get_context */, - lookup_context, &filter_block); + const Status s = ReadFilterBlock(table, prefetch_buffer, ro, use_cache, + nullptr /* get_context */, lookup_context, + &filter_block); if (!s.ok()) { IGNORE_STATUS_IF_ERROR(s); return std::unique_ptr(); @@ -411,7 +412,8 @@ size_t PartitionedFilterBlockReader::ApproximateMemoryUsage() const { } // TODO(myabandeh): merge this with the same function in IndexReader -void PartitionedFilterBlockReader::CacheDependencies(bool pin) { +void PartitionedFilterBlockReader::CacheDependencies(const ReadOptions& ro, + bool pin) { assert(table()); const BlockBasedTable::Rep* const rep = table()->get_rep(); @@ -457,11 +459,14 @@ void PartitionedFilterBlockReader::CacheDependencies(bool pin) { std::unique_ptr prefetch_buffer; prefetch_buffer.reset(new FilePrefetchBuffer()); - s = prefetch_buffer->Prefetch(rep->file.get(), prefetch_off, - static_cast(prefetch_len)); + IOOptions opts; + s = PrepareIOFromReadOptions(ro, rep->file->env(), opts); + if (s.ok()) { + s = prefetch_buffer->Prefetch(opts, rep->file.get(), prefetch_off, + static_cast(prefetch_len)); + } // After prefetch, read the partitions one by one - ReadOptions read_options; for (biter.SeekToFirst(); biter.Valid(); biter.Next()) { handle = biter.value().handle; @@ -469,9 +474,9 @@ void PartitionedFilterBlockReader::CacheDependencies(bool pin) { // TODO: Support counter batch update for partitioned index and // filter blocks s = table()->MaybeReadBlockAndLoadToCache( - prefetch_buffer.get(), read_options, handle, - UncompressionDict::GetEmptyDict(), &block, BlockType::kFilter, - nullptr /* get_context */, &lookup_context, nullptr /* contents */); + prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(), + &block, BlockType::kFilter, nullptr /* get_context */, &lookup_context, + nullptr /* contents */); assert(s.ok() || block.GetValue() == nullptr); if (s.ok() && block.GetValue() != nullptr) { diff --git a/table/block_based/partitioned_filter_block.h b/table/block_based/partitioned_filter_block.h index 111eae107..2ccc8f8bc 100644 --- a/table/block_based/partitioned_filter_block.h +++ b/table/block_based/partitioned_filter_block.h @@ -71,9 +71,9 @@ class PartitionedFilterBlockReader : public FilterBlockReaderCommon { CachableEntry&& filter_block); static std::unique_ptr Create( - const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer, - bool use_cache, bool prefetch, bool pin, - BlockCacheLookupContext* lookup_context); + const BlockBasedTable* table, const ReadOptions& ro, + FilePrefetchBuffer* prefetch_buffer, bool use_cache, bool prefetch, + bool pin, BlockCacheLookupContext* lookup_context); bool IsBlockBased() override { return false; } bool KeyMayMatch(const Slice& key, const SliceTransform* prefix_extractor, @@ -130,7 +130,7 @@ class PartitionedFilterBlockReader : public FilterBlockReaderCommon { uint64_t block_offset, BlockHandle filter_handle, bool no_io, BlockCacheLookupContext* lookup_context, FilterManyFunction filter_function) const; - void CacheDependencies(bool pin) override; + void CacheDependencies(const ReadOptions& ro, bool pin) override; const InternalKeyComparator* internal_comparator() const; bool index_key_includes_seq() const; diff --git a/table/block_based/partitioned_index_reader.cc b/table/block_based/partitioned_index_reader.cc index d235f0080..f8bb9562f 100644 --- a/table/block_based/partitioned_index_reader.cc +++ b/table/block_based/partitioned_index_reader.cc @@ -7,13 +7,15 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "table/block_based/partitioned_index_reader.h" + +#include "file/file_util.h" #include "table/block_based/partitioned_index_iterator.h" namespace ROCKSDB_NAMESPACE { Status PartitionIndexReader::Create( - const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer, - bool use_cache, bool prefetch, bool pin, - BlockCacheLookupContext* lookup_context, + const BlockBasedTable* table, const ReadOptions& ro, + FilePrefetchBuffer* prefetch_buffer, bool use_cache, bool prefetch, + bool pin, BlockCacheLookupContext* lookup_context, std::unique_ptr* index_reader) { assert(table != nullptr); assert(table->get_rep()); @@ -23,7 +25,7 @@ Status PartitionIndexReader::Create( CachableEntry index_block; if (prefetch || !use_cache) { const Status s = - ReadIndexBlock(table, prefetch_buffer, ReadOptions(), use_cache, + ReadIndexBlock(table, prefetch_buffer, ro, use_cache, /*get_context=*/nullptr, lookup_context, &index_block); if (!s.ok()) { return s; @@ -75,6 +77,7 @@ InternalIteratorBase* PartitionIndexReader::NewIterator( } else { ReadOptions ro; ro.fill_cache = read_options.fill_cache; + ro.deadline = read_options.deadline; // We don't return pinned data from index blocks, so no need // to set `block_contents_pinned`. std::unique_ptr> index_iter( @@ -100,7 +103,7 @@ InternalIteratorBase* PartitionIndexReader::NewIterator( // the first level iter is always on heap and will attempt to delete it // in its destructor. } -void PartitionIndexReader::CacheDependencies(bool pin) { +void PartitionIndexReader::CacheDependencies(const ReadOptions& ro, bool pin) { // Before read partitions, prefetch them to avoid lots of IOs BlockCacheLookupContext lookup_context{TableReaderCaller::kPrefetch}; const BlockBasedTable::Rep* rep = table()->rep_; @@ -147,12 +150,15 @@ void PartitionIndexReader::CacheDependencies(bool pin) { uint64_t prefetch_len = last_off - prefetch_off; std::unique_ptr prefetch_buffer; rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer); - s = prefetch_buffer->Prefetch(rep->file.get(), prefetch_off, - static_cast(prefetch_len)); + IOOptions opts; + s = PrepareIOFromReadOptions(ro, rep->file->env(), opts); + if (s.ok()) { + s = prefetch_buffer->Prefetch(opts, rep->file.get(), prefetch_off, + static_cast(prefetch_len)); + } // After prefetch, read the partitions one by one biter.SeekToFirst(); - auto ro = ReadOptions(); for (; biter.Valid(); biter.Next()) { handle = biter.value().handle; CachableEntry block; diff --git a/table/block_based/partitioned_index_reader.h b/table/block_based/partitioned_index_reader.h index 86397fd58..6be868f93 100644 --- a/table/block_based/partitioned_index_reader.h +++ b/table/block_based/partitioned_index_reader.h @@ -17,7 +17,7 @@ class PartitionIndexReader : public BlockBasedTable::IndexReaderCommon { // `PartitionIndexReader`. // On success, index_reader will be populated; otherwise it will remain // unmodified. - static Status Create(const BlockBasedTable* table, + static Status Create(const BlockBasedTable* table, const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer, bool use_cache, bool prefetch, bool pin, BlockCacheLookupContext* lookup_context, @@ -29,7 +29,7 @@ class PartitionIndexReader : public BlockBasedTable::IndexReaderCommon { IndexBlockIter* iter, GetContext* get_context, BlockCacheLookupContext* lookup_context) override; - void CacheDependencies(bool pin) override; + void CacheDependencies(const ReadOptions& ro, bool pin) override; size_t ApproximateMemoryUsage() const override { size_t usage = ApproximateIndexBlockMemoryUsage(); #ifdef ROCKSDB_MALLOC_USABLE_SIZE diff --git a/table/block_based/uncompression_dict_reader.cc b/table/block_based/uncompression_dict_reader.cc index 78e2b93c1..db33e9340 100644 --- a/table/block_based/uncompression_dict_reader.cc +++ b/table/block_based/uncompression_dict_reader.cc @@ -12,9 +12,9 @@ namespace ROCKSDB_NAMESPACE { Status UncompressionDictReader::Create( - const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer, - bool use_cache, bool prefetch, bool pin, - BlockCacheLookupContext* lookup_context, + const BlockBasedTable* table, const ReadOptions& ro, + FilePrefetchBuffer* prefetch_buffer, bool use_cache, bool prefetch, + bool pin, BlockCacheLookupContext* lookup_context, std::unique_ptr* uncompression_dict_reader) { assert(table); assert(table->get_rep()); @@ -24,8 +24,8 @@ Status UncompressionDictReader::Create( CachableEntry uncompression_dict; if (prefetch || !use_cache) { const Status s = ReadUncompressionDictionary( - table, prefetch_buffer, ReadOptions(), use_cache, - nullptr /* get_context */, lookup_context, &uncompression_dict); + table, prefetch_buffer, ro, use_cache, nullptr /* get_context */, + lookup_context, &uncompression_dict); if (!s.ok()) { return s; } diff --git a/table/block_based/uncompression_dict_reader.h b/table/block_based/uncompression_dict_reader.h index 3e7826179..e8801e815 100644 --- a/table/block_based/uncompression_dict_reader.h +++ b/table/block_based/uncompression_dict_reader.h @@ -25,9 +25,9 @@ struct UncompressionDict; class UncompressionDictReader { public: static Status Create( - const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer, - bool use_cache, bool prefetch, bool pin, - BlockCacheLookupContext* lookup_context, + const BlockBasedTable* table, const ReadOptions& ro, + FilePrefetchBuffer* prefetch_buffer, bool use_cache, bool prefetch, + bool pin, BlockCacheLookupContext* lookup_context, std::unique_ptr* uncompression_dict_reader); Status GetOrReadUncompressionDictionary( diff --git a/table/block_fetcher.cc b/table/block_fetcher.cc index 627af7f23..355f7d736 100644 --- a/table/block_fetcher.cc +++ b/table/block_fetcher.cc @@ -58,16 +58,19 @@ inline bool BlockFetcher::TryGetUncompressBlockFromPersistentCache() { } inline bool BlockFetcher::TryGetFromPrefetchBuffer() { - if (prefetch_buffer_ != nullptr && - prefetch_buffer_->TryReadFromCache( - handle_.offset(), block_size_with_trailer_, &slice_, - for_compaction_)) { - CheckBlockChecksum(); - if (!status_.ok()) { - return true; + if (prefetch_buffer_ != nullptr) { + IOOptions opts; + Status s = PrepareIOFromReadOptions(read_options_, file_->env(), opts); + if (s.ok() && prefetch_buffer_->TryReadFromCache( + opts, handle_.offset(), block_size_with_trailer_, &slice_, + for_compaction_)) { + CheckBlockChecksum(); + if (!status_.ok()) { + return true; + } + got_from_prefetch_buffer_ = true; + used_buf_ = const_cast(slice_.data()); } - got_from_prefetch_buffer_ = true; - used_buf_ = const_cast(slice_.data()); } return got_from_prefetch_buffer_; } diff --git a/table/block_fetcher_test.cc b/table/block_fetcher_test.cc index bc1bda715..04bf8186c 100644 --- a/table/block_fetcher_test.cc +++ b/table/block_fetcher_test.cc @@ -244,7 +244,8 @@ class BlockFetcherTest : public testing::Test { ASSERT_OK(env_->GetFileSize(Path(table_name), &file_size)); std::unique_ptr table_reader; - ASSERT_OK(BlockBasedTable::Open(ioptions, EnvOptions(), + ReadOptions ro; + ASSERT_OK(BlockBasedTable::Open(ro, ioptions, EnvOptions(), table_factory_.table_options(), comparator, std::move(file), file_size, &table_reader)); @@ -259,8 +260,9 @@ class BlockFetcherTest : public testing::Test { void ReadFooter(RandomAccessFileReader* file, Footer* footer) { uint64_t file_size = 0; ASSERT_OK(env_->GetFileSize(file->file_name(), &file_size)); - ReadFooterFromFile(file, nullptr /* prefetch_buffer */, file_size, footer, - kBlockBasedTableMagicNumber); + IOOptions opts; + ReadFooterFromFile(opts, file, nullptr /* prefetch_buffer */, file_size, + footer, kBlockBasedTableMagicNumber); } // NOTE: compression_type returns the compression type of the fetched block @@ -315,8 +317,9 @@ class BlockFetcherTest : public testing::Test { NewTableReader(ioptions, foptions, comparator, table_name, &table); std::unique_ptr index_reader; + ReadOptions ro; ASSERT_OK(BinarySearchIndexReader::Create( - table.get(), nullptr /* prefetch_buffer */, false /* use_cache */, + table.get(), ro, nullptr /* prefetch_buffer */, false /* use_cache */, false /* prefetch */, false /* pin */, nullptr /* lookup_context */, &index_reader)); diff --git a/table/cuckoo/cuckoo_table_factory.cc b/table/cuckoo/cuckoo_table_factory.cc index 46adf8a4f..5ab8b2f42 100644 --- a/table/cuckoo/cuckoo_table_factory.cc +++ b/table/cuckoo/cuckoo_table_factory.cc @@ -13,7 +13,7 @@ namespace ROCKSDB_NAMESPACE { Status CuckooTableFactory::NewTableReader( - const TableReaderOptions& table_reader_options, + const ReadOptions& /*ro*/, const TableReaderOptions& table_reader_options, std::unique_ptr&& file, uint64_t file_size, std::unique_ptr* table, bool /*prefetch_index_and_filter_in_cache*/) const { diff --git a/table/cuckoo/cuckoo_table_factory.h b/table/cuckoo/cuckoo_table_factory.h index 6a8c36981..d868a2db4 100644 --- a/table/cuckoo/cuckoo_table_factory.h +++ b/table/cuckoo/cuckoo_table_factory.h @@ -58,8 +58,9 @@ class CuckooTableFactory : public TableFactory { const char* Name() const override { return "CuckooTable"; } + using TableFactory::NewTableReader; Status NewTableReader( - const TableReaderOptions& table_reader_options, + const ReadOptions& ro, const TableReaderOptions& table_reader_options, std::unique_ptr&& file, uint64_t file_size, std::unique_ptr* table, bool prefetch_index_and_filter_in_cache = true) const override; diff --git a/table/format.cc b/table/format.cc index de4e29664..0dfa6e254 100644 --- a/table/format.cc +++ b/table/format.cc @@ -281,7 +281,7 @@ std::string Footer::ToString() const { return result; } -Status ReadFooterFromFile(RandomAccessFileReader* file, +Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer, uint64_t file_size, Footer* footer, uint64_t enforce_table_magic_number) { @@ -300,15 +300,20 @@ Status ReadFooterFromFile(RandomAccessFileReader* file, ? static_cast(file_size - Footer::kMaxEncodedLength) : 0; Status s; + // TODO: Need to pass appropriate deadline to TryReadFromCache(). Right now, + // there is no readahead for point lookups, so TryReadFromCache will fail if + // the required data is not in the prefetch buffer. Once deadline is enabled + // for iterator, TryReadFromCache might do a readahead. Revisit to see if we + // need to pass a timeout at that point if (prefetch_buffer == nullptr || - !prefetch_buffer->TryReadFromCache(read_offset, Footer::kMaxEncodedLength, - &footer_input)) { + !prefetch_buffer->TryReadFromCache( + IOOptions(), read_offset, Footer::kMaxEncodedLength, &footer_input)) { if (file->use_direct_io()) { - s = file->Read(IOOptions(), read_offset, Footer::kMaxEncodedLength, + s = file->Read(opts, read_offset, Footer::kMaxEncodedLength, &footer_input, nullptr, &internal_buf); } else { footer_buf.reserve(Footer::kMaxEncodedLength); - s = file->Read(IOOptions(), read_offset, Footer::kMaxEncodedLength, + s = file->Read(opts, read_offset, Footer::kMaxEncodedLength, &footer_input, &footer_buf[0], nullptr); } if (!s.ok()) return s; diff --git a/table/format.h b/table/format.h index 725435900..5b6e6a925 100644 --- a/table/format.h +++ b/table/format.h @@ -215,7 +215,7 @@ class Footer { // Read the footer from file // If enforce_table_magic_number != 0, ReadFooterFromFile() will return // corruption if table_magic number is not equal to enforce_table_magic_number -Status ReadFooterFromFile(RandomAccessFileReader* file, +Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer, uint64_t file_size, Footer* footer, uint64_t enforce_table_magic_number = 0); diff --git a/table/meta_blocks.cc b/table/meta_blocks.cc index 7ac21f5dc..21c4c1fb0 100644 --- a/table/meta_blocks.cc +++ b/table/meta_blocks.cc @@ -193,7 +193,8 @@ bool NotifyCollectTableCollectorsOnFinish( return all_succeeded; } -Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, +Status ReadProperties(const ReadOptions& read_options, + const Slice& handle_value, RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer, const Footer& footer, const ImmutableCFOptions& ioptions, TableProperties** table_properties, bool verify_checksum, @@ -210,16 +211,16 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, } BlockContents block_contents; - ReadOptions read_options; - read_options.verify_checksums = verify_checksum; Status s; PersistentCacheOptions cache_options; + ReadOptions ro = read_options; + ro.verify_checksums = verify_checksum; - BlockFetcher block_fetcher( - file, prefetch_buffer, footer, read_options, handle, &block_contents, - ioptions, false /* decompress */, false /*maybe_compressed*/, - BlockType::kProperties, UncompressionDict::GetEmptyDict(), cache_options, - memory_allocator); + BlockFetcher block_fetcher(file, prefetch_buffer, footer, ro, handle, + &block_contents, ioptions, false /* decompress */, + false /*maybe_compressed*/, BlockType::kProperties, + UncompressionDict::GetEmptyDict(), cache_options, + memory_allocator); s = block_fetcher.ReadBlockContents(); // property block is never compressed. Need to add uncompress logic if we are // to compress it.. @@ -368,7 +369,8 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size, FilePrefetchBuffer* prefetch_buffer) { // -- Read metaindex block Footer footer; - auto s = ReadFooterFromFile(file, prefetch_buffer, file_size, &footer, + IOOptions opts; + auto s = ReadFooterFromFile(opts, file, prefetch_buffer, file_size, &footer, table_magic_number); if (!s.ok()) { return s; @@ -405,11 +407,11 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size, TableProperties table_properties; if (found_properties_block == true) { - s = ReadProperties(meta_iter->value(), file, prefetch_buffer, footer, - ioptions, properties, false /* verify_checksum */, - nullptr /* ret_block_hanel */, - nullptr /* ret_block_contents */, - compression_type_missing, memory_allocator); + s = ReadProperties( + read_options, meta_iter->value(), file, prefetch_buffer, footer, + ioptions, properties, false /* verify_checksum */, + nullptr /* ret_block_hanel */, nullptr /* ret_block_contents */, + compression_type_missing, memory_allocator); } else { s = Status::NotFound(); } @@ -438,8 +440,9 @@ Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size, bool /*compression_type_missing*/, MemoryAllocator* memory_allocator) { Footer footer; - auto s = ReadFooterFromFile(file, nullptr /* prefetch_buffer */, file_size, - &footer, table_magic_number); + IOOptions opts; + auto s = ReadFooterFromFile(opts, file, nullptr /* prefetch_buffer */, + file_size, &footer, table_magic_number); if (!s.ok()) { return s; } @@ -480,7 +483,8 @@ Status ReadMetaBlock(RandomAccessFileReader* file, MemoryAllocator* memory_allocator) { Status status; Footer footer; - status = ReadFooterFromFile(file, prefetch_buffer, file_size, &footer, + IOOptions opts; + status = ReadFooterFromFile(opts, file, prefetch_buffer, file_size, &footer, table_magic_number); if (!status.ok()) { return status; diff --git a/table/meta_blocks.h b/table/meta_blocks.h index 40a6f33fb..319b2c712 100644 --- a/table/meta_blocks.h +++ b/table/meta_blocks.h @@ -99,7 +99,8 @@ bool NotifyCollectTableCollectorsOnFinish( // @returns a status to indicate if the operation succeeded. On success, // *table_properties will point to a heap-allocated TableProperties // object, otherwise value of `table_properties` will not be modified. -Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, +Status ReadProperties(const ReadOptions& ro, const Slice& handle_value, + RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer, const Footer& footer, const ImmutableCFOptions& ioptions, TableProperties** table_properties, bool verify_checksum, diff --git a/table/mock_table.cc b/table/mock_table.cc index a1a5ed8d7..30ec9a671 100644 --- a/table/mock_table.cc +++ b/table/mock_table.cc @@ -61,6 +61,7 @@ std::shared_ptr MockTableReader::GetTableProperties() MockTableFactory::MockTableFactory() : next_id_(1) {} Status MockTableFactory::NewTableReader( + const ReadOptions& /*ro*/, const TableReaderOptions& /*table_reader_options*/, std::unique_ptr&& file, uint64_t /*file_size*/, std::unique_ptr* table_reader, diff --git a/table/mock_table.h b/table/mock_table.h index 097809c0c..7d89fd382 100644 --- a/table/mock_table.h +++ b/table/mock_table.h @@ -176,8 +176,9 @@ class MockTableFactory : public TableFactory { public: MockTableFactory(); const char* Name() const override { return "MockTable"; } + using TableFactory::NewTableReader; Status NewTableReader( - const TableReaderOptions& table_reader_options, + const ReadOptions& ro, const TableReaderOptions& table_reader_options, std::unique_ptr&& file, uint64_t file_size, std::unique_ptr* table_reader, bool prefetch_index_and_filter_in_cache = true) const override; diff --git a/table/plain/plain_table_factory.cc b/table/plain/plain_table_factory.cc index e217640ff..86e6a32f4 100644 --- a/table/plain/plain_table_factory.cc +++ b/table/plain/plain_table_factory.cc @@ -49,7 +49,7 @@ static std::unordered_map plain_table_type_info = { OptionTypeFlags::kNone, 0}}}; Status PlainTableFactory::NewTableReader( - const TableReaderOptions& table_reader_options, + const ReadOptions& /*ro*/, const TableReaderOptions& table_reader_options, std::unique_ptr&& file, uint64_t file_size, std::unique_ptr* table, bool /*prefetch_index_and_filter_in_cache*/) const { diff --git a/table/plain/plain_table_factory.h b/table/plain/plain_table_factory.h index 282a24a5b..f267127b0 100644 --- a/table/plain/plain_table_factory.h +++ b/table/plain/plain_table_factory.h @@ -35,7 +35,7 @@ class TableBuilder; // 1. Data compression is not supported. // 2. Data is not checksumed. // it is not recommended to use this format on other type of file systems. -// +// // PlainTable requires fixed length key, configured as a constructor // parameter of the factory class. Output file format: // +-------------+-----------------+ @@ -160,7 +160,9 @@ class PlainTableFactory : public TableFactory { : table_options_(_table_options) {} const char* Name() const override { return "PlainTable"; } - Status NewTableReader(const TableReaderOptions& table_reader_options, + using TableFactory::NewTableReader; + Status NewTableReader(const ReadOptions& ro, + const TableReaderOptions& table_reader_options, std::unique_ptr&& file, uint64_t file_size, std::unique_ptr* table, bool prefetch_index_and_filter_in_cache) const override; diff --git a/table/sst_file_dumper.cc b/table/sst_file_dumper.cc index aea13661d..7d5d6e421 100644 --- a/table/sst_file_dumper.cc +++ b/table/sst_file_dumper.cc @@ -103,10 +103,12 @@ Status SstFileDumper::GetTableReader(const std::string& file_path) { ? kSstDumpTailPrefetchSize : file_size; uint64_t prefetch_off = file_size - prefetch_size; - prefetch_buffer.Prefetch(file_.get(), prefetch_off, + IOOptions opts; + prefetch_buffer.Prefetch(opts, file_.get(), prefetch_off, static_cast(prefetch_size)); - s = ReadFooterFromFile(file_.get(), &prefetch_buffer, file_size, &footer); + s = ReadFooterFromFile(opts, file_.get(), &prefetch_buffer, file_size, + &footer); } if (s.ok()) { magic_number = footer.table_magic_number(); diff --git a/table/table_test.cc b/table/table_test.cc index 5899ae945..b77845ce5 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -4321,8 +4321,10 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) { uint64_t file_size = ss_rw.contents().size(); Footer footer; - ASSERT_OK(ReadFooterFromFile(file, nullptr /* prefetch_buffer */, file_size, - &footer, kBlockBasedTableMagicNumber)); + IOOptions opts; + ASSERT_OK(ReadFooterFromFile(opts, file, nullptr /* prefetch_buffer */, + file_size, &footer, + kBlockBasedTableMagicNumber)); auto BlockFetchHelper = [&](const BlockHandle& handle, BlockType block_type, BlockContents* contents) { @@ -4408,7 +4410,8 @@ TEST_P(BlockBasedTableTest, PropertiesMetaBlockLast) { // read footer Footer footer; - ASSERT_OK(ReadFooterFromFile(table_reader.get(), + IOOptions opts; + ASSERT_OK(ReadFooterFromFile(opts, table_reader.get(), nullptr /* prefetch_buffer */, table_size, &footer, kBlockBasedTableMagicNumber)); @@ -4505,9 +4508,10 @@ TEST_F(BBTTailPrefetchTest, TestTailPrefetchStats) { TEST_F(BBTTailPrefetchTest, FilePrefetchBufferMinOffset) { TailPrefetchStats tpstats; FilePrefetchBuffer buffer(nullptr, 0, 0, false, true); - buffer.TryReadFromCache(500, 10, nullptr); - buffer.TryReadFromCache(480, 10, nullptr); - buffer.TryReadFromCache(490, 10, nullptr); + IOOptions opts; + buffer.TryReadFromCache(opts, 500, 10, nullptr); + buffer.TryReadFromCache(opts, 480, 10, nullptr); + buffer.TryReadFromCache(opts, 490, 10, nullptr); ASSERT_EQ(480, buffer.min_offset_read()); } diff --git a/utilities/options/options_util_test.cc b/utilities/options/options_util_test.cc index 0062ad1cd..f493de887 100644 --- a/utilities/options/options_util_test.cc +++ b/utilities/options/options_util_test.cc @@ -170,7 +170,9 @@ class DummyTableFactory : public TableFactory { const char* Name() const override { return "DummyTableFactory"; } + using TableFactory::NewTableReader; Status NewTableReader( + const ReadOptions& /*ro*/, const TableReaderOptions& /*table_reader_options*/, std::unique_ptr&& /*file*/, uint64_t /*file_size*/, std::unique_ptr* /*table_reader*/,