From 17002365c11d66af172e210f577aed88ed434dce Mon Sep 17 00:00:00 2001 From: Zhichao Cao Date: Thu, 1 Apr 2021 10:06:55 -0700 Subject: [PATCH] Replace Status with IOStatus for block fetcher IO function (#8130) Summary: To propagate the IOStatus from file reads to RocksDB read logic, some of the existing status needs to be replaced by IOStatus. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8130 Test Plan: make check Reviewed By: anand1976 Differential Revision: D27440188 Pulled By: zhichao-cao fbshipit-source-id: bbe7622c2106fe4e46871d60f7c26944e5030d78 --- file/random_access_file_reader.cc | 54 ++++++++++----------- file/random_access_file_reader.h | 20 ++++---- table/block_fetcher.cc | 81 ++++++++++++++++--------------- table/block_fetcher.h | 4 +- 4 files changed, 80 insertions(+), 79 deletions(-) diff --git a/file/random_access_file_reader.cc b/file/random_access_file_reader.cc index 458b1bd00..e15b6b033 100644 --- a/file/random_access_file_reader.cc +++ b/file/random_access_file_reader.cc @@ -22,26 +22,26 @@ #include "util/rate_limiter.h" namespace ROCKSDB_NAMESPACE { -Status RandomAccessFileReader::Create( +IOStatus RandomAccessFileReader::Create( const std::shared_ptr& fs, const std::string& fname, const FileOptions& file_opts, std::unique_ptr* reader, IODebugContext* dbg) { std::unique_ptr file; - Status s = fs->NewRandomAccessFile(fname, file_opts, &file, dbg); - if (s.ok()) { + IOStatus io_s = fs->NewRandomAccessFile(fname, file_opts, &file, dbg); + if (io_s.ok()) { reader->reset(new RandomAccessFileReader(std::move(file), fname)); } - return s; + return io_s; } -Status RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset, - size_t n, Slice* result, char* scratch, - AlignedBuf* aligned_buf, - bool for_compaction) const { +IOStatus RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset, + size_t n, Slice* result, char* scratch, + AlignedBuf* aligned_buf, + bool for_compaction) const { (void)aligned_buf; TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read", nullptr); - Status s; + IOStatus io_s; uint64_t elapsed = 0; { StopWatch sw(clock_, stats_, hist_type_, @@ -86,22 +86,22 @@ Status RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset, // one iteration of this loop, so we don't need to check and adjust // the opts.timeout before calling file_->Read assert(!opts.timeout.count() || allowed == read_size); - s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, opts, - &tmp, buf.Destination(), nullptr); + io_s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, opts, + &tmp, buf.Destination(), nullptr); } if (ShouldNotifyListeners()) { auto finish_ts = FileOperationInfo::FinishNow(); NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts, - s); + io_s); } buf.Size(buf.CurrentSize() + tmp.size()); - if (!s.ok() || tmp.size() < allowed) { + if (!io_s.ok() || tmp.size() < allowed) { break; } } size_t res_len = 0; - if (s.ok() && offset_advance < buf.CurrentSize()) { + if (io_s.ok() && offset_advance < buf.CurrentSize()) { res_len = std::min(buf.CurrentSize() - offset_advance, n); if (aligned_buf == nullptr) { buf.Read(scratch, offset_advance, res_len); @@ -146,14 +146,14 @@ Status RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset, // one iteration of this loop, so we don't need to check and adjust // the opts.timeout before calling file_->Read assert(!opts.timeout.count() || allowed == n); - s = file_->Read(offset + pos, allowed, opts, &tmp_result, - scratch + pos, nullptr); + io_s = file_->Read(offset + pos, allowed, opts, &tmp_result, + scratch + pos, nullptr); } #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { auto finish_ts = FileOperationInfo::FinishNow(); NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts, - finish_ts, s); + finish_ts, io_s); } #endif @@ -166,11 +166,11 @@ Status RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset, assert(tmp_result.data() == res_scratch + pos); } pos += tmp_result.size(); - if (!s.ok() || tmp_result.size() < allowed) { + if (!io_s.ok() || tmp_result.size() < allowed) { break; } } - *result = Slice(res_scratch, s.ok() ? pos : 0); + *result = Slice(res_scratch, io_s.ok() ? pos : 0); } IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size()); SetPerfLevel(prev_perf_level); @@ -179,7 +179,7 @@ Status RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset, file_read_hist_->Add(elapsed); } - return s; + return io_s; } size_t End(const FSReadRequest& r) { @@ -208,13 +208,13 @@ bool TryMerge(FSReadRequest* dest, const FSReadRequest& src) { return true; } -Status RandomAccessFileReader::MultiRead(const IOOptions& opts, - FSReadRequest* read_reqs, - size_t num_reqs, - AlignedBuf* aligned_buf) const { +IOStatus RandomAccessFileReader::MultiRead(const IOOptions& opts, + FSReadRequest* read_reqs, + size_t num_reqs, + AlignedBuf* aligned_buf) const { (void)aligned_buf; // suppress warning of unused variable in LITE mode assert(num_reqs > 0); - Status s; + IOStatus io_s; uint64_t elapsed = 0; { StopWatch sw(clock_, stats_, hist_type_, @@ -280,7 +280,7 @@ Status RandomAccessFileReader::MultiRead(const IOOptions& opts, { IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_); - s = file_->MultiRead(fs_reqs, num_fs_reqs, opts, nullptr); + io_s = file_->MultiRead(fs_reqs, num_fs_reqs, opts, nullptr); } #ifndef ROCKSDB_LITE @@ -321,7 +321,7 @@ Status RandomAccessFileReader::MultiRead(const IOOptions& opts, file_read_hist_->Add(elapsed); } - return s; + return io_s; } IOStatus RandomAccessFileReader::PrepareIOOptions(const ReadOptions& ro, diff --git a/file/random_access_file_reader.h b/file/random_access_file_reader.h index 4f39409bb..181f4dd02 100644 --- a/file/random_access_file_reader.h +++ b/file/random_access_file_reader.h @@ -103,10 +103,10 @@ class RandomAccessFileReader { #endif } - static Status Create(const std::shared_ptr& fs, - const std::string& fname, const FileOptions& file_opts, - std::unique_ptr* reader, - IODebugContext* dbg); + static IOStatus Create(const std::shared_ptr& fs, + const std::string& fname, const FileOptions& file_opts, + std::unique_ptr* reader, + IODebugContext* dbg); RandomAccessFileReader(const RandomAccessFileReader&) = delete; RandomAccessFileReader& operator=(const RandomAccessFileReader&) = delete; @@ -120,19 +120,19 @@ class RandomAccessFileReader { // 2. Otherwise, scratch is not used and can be null, the aligned_buf owns // the internally allocated buffer on return, and the result refers to a // region in aligned_buf. - Status Read(const IOOptions& opts, uint64_t offset, size_t n, Slice* result, - char* scratch, AlignedBuf* aligned_buf, - bool for_compaction = false) const; + IOStatus Read(const IOOptions& opts, uint64_t offset, size_t n, Slice* result, + char* scratch, AlignedBuf* aligned_buf, + bool for_compaction = false) const; // REQUIRES: // num_reqs > 0, reqs do not overlap, and offsets in reqs are increasing. // In non-direct IO mode, aligned_buf should be null; // In direct IO mode, aligned_buf stores the aligned buffer allocated inside // MultiRead, the result Slices in reqs refer to aligned_buf. - Status MultiRead(const IOOptions& opts, FSReadRequest* reqs, size_t num_reqs, - AlignedBuf* aligned_buf) const; + IOStatus MultiRead(const IOOptions& opts, FSReadRequest* reqs, + size_t num_reqs, AlignedBuf* aligned_buf) const; - Status Prefetch(uint64_t offset, size_t n) const { + IOStatus Prefetch(uint64_t offset, size_t n) const { return file_->Prefetch(offset, n, IOOptions(), nullptr); } diff --git a/table/block_fetcher.cc b/table/block_fetcher.cc index 588dae474..dd39c0962 100644 --- a/table/block_fetcher.cc +++ b/table/block_fetcher.cc @@ -29,9 +29,9 @@ namespace ROCKSDB_NAMESPACE { inline void BlockFetcher::CheckBlockChecksum() { // Check the crc of the type and the block contents if (read_options_.verify_checksums) { - status_ = ROCKSDB_NAMESPACE::VerifyBlockChecksum( + io_status_ = status_to_io_status(ROCKSDB_NAMESPACE::VerifyBlockChecksum( footer_.checksum(), slice_.data(), block_size_, file_->file_name(), - handle_.offset()); + handle_.offset())); } } @@ -59,18 +59,18 @@ inline bool BlockFetcher::TryGetUncompressBlockFromPersistentCache() { inline bool BlockFetcher::TryGetFromPrefetchBuffer() { if (prefetch_buffer_ != nullptr) { IOOptions opts; - Status s = file_->PrepareIOOptions(read_options_, opts); - if (s.ok() && prefetch_buffer_->TryReadFromCache( - opts, handle_.offset(), block_size_with_trailer_, &slice_, - &s, for_compaction_)) { + IOStatus io_s = file_->PrepareIOOptions(read_options_, opts); + if (io_s.ok() && prefetch_buffer_->TryReadFromCache( + opts, handle_.offset(), block_size_with_trailer_, + &slice_, &io_s, for_compaction_)) { CheckBlockChecksum(); - if (!status_.ok()) { + if (!io_status_.ok()) { return true; } got_from_prefetch_buffer_ = true; used_buf_ = const_cast(slice_.data()); - } else if (!s.ok()) { - status_ = s; + } else if (!io_s.ok()) { + io_status_ = io_s; return true; } } @@ -82,18 +82,18 @@ inline bool BlockFetcher::TryGetCompressedBlockFromPersistentCache() { cache_options_.persistent_cache->IsCompressed()) { // lookup uncompressed cache mode p-cache std::unique_ptr raw_data; - status_ = PersistentCacheHelper::LookupRawPage( - cache_options_, handle_, &raw_data, block_size_with_trailer_); - if (status_.ok()) { + io_status_ = status_to_io_status(PersistentCacheHelper::LookupRawPage( + cache_options_, handle_, &raw_data, block_size_with_trailer_)); + if (io_status_.ok()) { heap_buf_ = CacheAllocationPtr(raw_data.release()); used_buf_ = heap_buf_.get(); slice_ = Slice(heap_buf_.get(), block_size_); return true; - } else if (!status_.IsNotFound() && ioptions_.info_log) { - assert(!status_.ok()); + } else if (!io_status_.IsNotFound() && ioptions_.info_log) { + assert(!io_status_.ok()); ROCKS_LOG_INFO(ioptions_.info_log, "Error reading from persistent cache. %s", - status_.ToString().c_str()); + io_status_.ToString().c_str()); } } return false; @@ -136,7 +136,7 @@ inline void BlockFetcher::PrepareBufferForBlockFromFile() { } inline void BlockFetcher::InsertCompressedBlockToPersistentCacheIfNeeded() { - if (status_.ok() && read_options_.fill_cache && + if (io_status_.ok() && read_options_.fill_cache && cache_options_.persistent_cache && cache_options_.persistent_cache->IsCompressed()) { // insert to raw cache @@ -146,8 +146,8 @@ inline void BlockFetcher::InsertCompressedBlockToPersistentCacheIfNeeded() { } inline void BlockFetcher::InsertUncompressedBlockToPersistentCacheIfNeeded() { - if (status_.ok() && !got_from_prefetch_buffer_ && read_options_.fill_cache && - cache_options_.persistent_cache && + if (io_status_.ok() && !got_from_prefetch_buffer_ && + read_options_.fill_cache && cache_options_.persistent_cache && !cache_options_.persistent_cache->IsCompressed()) { // insert to uncompressed cache PersistentCacheHelper::InsertUncompressedPage(cache_options_, handle_, @@ -215,26 +215,26 @@ inline void BlockFetcher::GetBlockContents() { #endif } -Status BlockFetcher::ReadBlockContents() { +IOStatus BlockFetcher::ReadBlockContents() { if (TryGetUncompressBlockFromPersistentCache()) { compression_type_ = kNoCompression; #ifndef NDEBUG contents_->is_raw_block = true; #endif // NDEBUG - return Status::OK(); + return IOStatus::OK(); } if (TryGetFromPrefetchBuffer()) { - if (!status_.ok()) { - return status_; + if (!io_status_.ok()) { + return io_status_; } } else if (!TryGetCompressedBlockFromPersistentCache()) { IOOptions opts; - status_ = file_->PrepareIOOptions(read_options_, opts); + io_status_ = file_->PrepareIOOptions(read_options_, opts); // Actual file read - if (status_.ok()) { + if (io_status_.ok()) { if (file_->use_direct_io()) { PERF_TIMER_GUARD(block_read_time); - status_ = + io_status_ = file_->Read(opts, handle_.offset(), block_size_with_trailer_, &slice_, nullptr, &direct_io_buf_, for_compaction_); PERF_COUNTER_ADD(block_read_count, 1); @@ -242,8 +242,9 @@ Status BlockFetcher::ReadBlockContents() { } else { PrepareBufferForBlockFromFile(); PERF_TIMER_GUARD(block_read_time); - status_ = file_->Read(opts, handle_.offset(), block_size_with_trailer_, - &slice_, used_buf_, nullptr, for_compaction_); + io_status_ = + file_->Read(opts, handle_.offset(), block_size_with_trailer_, + &slice_, used_buf_, nullptr, for_compaction_); PERF_COUNTER_ADD(block_read_count, 1); #ifndef NDEBUG if (slice_.data() == &stack_buf_[0]) { @@ -277,23 +278,23 @@ Status BlockFetcher::ReadBlockContents() { } PERF_COUNTER_ADD(block_read_byte, block_size_with_trailer_); - if (!status_.ok()) { - return status_; + if (!io_status_.ok()) { + return io_status_; } if (slice_.size() != block_size_with_trailer_) { - return Status::Corruption("truncated block read from " + - file_->file_name() + " offset " + - ToString(handle_.offset()) + ", expected " + - ToString(block_size_with_trailer_) + - " bytes, got " + ToString(slice_.size())); + return IOStatus::Corruption("truncated block read from " + + file_->file_name() + " offset " + + ToString(handle_.offset()) + ", expected " + + ToString(block_size_with_trailer_) + + " bytes, got " + ToString(slice_.size())); } CheckBlockChecksum(); - if (status_.ok()) { + if (io_status_.ok()) { InsertCompressedBlockToPersistentCacheIfNeeded(); } else { - return status_; + return io_status_; } } @@ -304,9 +305,9 @@ Status BlockFetcher::ReadBlockContents() { // compressed page, uncompress, update cache UncompressionContext context(compression_type_); UncompressionInfo info(context, uncompression_dict_, compression_type_); - status_ = UncompressBlockContents(info, slice_.data(), block_size_, - contents_, footer_.version(), ioptions_, - memory_allocator_); + io_status_ = status_to_io_status(UncompressBlockContents( + info, slice_.data(), block_size_, contents_, footer_.version(), + ioptions_, memory_allocator_)); #ifndef NDEBUG num_heap_buf_memcpy_++; #endif @@ -317,7 +318,7 @@ Status BlockFetcher::ReadBlockContents() { InsertUncompressedBlockToPersistentCacheIfNeeded(); - return status_; + return io_status_; } } // namespace ROCKSDB_NAMESPACE diff --git a/table/block_fetcher.h b/table/block_fetcher.h index c03352e98..94050401a 100644 --- a/table/block_fetcher.h +++ b/table/block_fetcher.h @@ -64,7 +64,7 @@ class BlockFetcher { memory_allocator_compressed_(memory_allocator_compressed), for_compaction_(for_compaction) {} - Status ReadBlockContents(); + IOStatus ReadBlockContents(); CompressionType get_compression_type() const { return compression_type_; } #ifndef NDEBUG @@ -100,7 +100,7 @@ class BlockFetcher { const PersistentCacheOptions& cache_options_; MemoryAllocator* memory_allocator_; MemoryAllocator* memory_allocator_compressed_; - Status status_; + IOStatus io_status_; Slice slice_; char* used_buf_ = nullptr; AlignedBuf direct_io_buf_;