diff --git a/HISTORY.md b/HISTORY.md index 7c4738c15..24d498ec1 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -11,7 +11,7 @@ ### New Features * DB::GetLiveFilesStorageInfo is ready for production use. * Add new stats PREFETCHED_BYTES_DISCARDED which records number of prefetched bytes discarded by RocksDB FilePrefetchBuffer on destruction and POLL_WAIT_MICROS records wait time for FS::Poll API completion. -* RemoteCompaction supports table_properties_collector_factories override on compaction worker. +* RemoteCompaction supports table_properties_collector_factories override on compaction worker. ### Public API changes * Add rollback_deletion_type_callback to TransactionDBOptions so that write-prepared transactions know whether to issue a Delete or SingleDelete to cancel a previous key written during prior prepare phase. The PR aims to prevent mixing SingleDeletes and Deletes for the same key that can lead to undefined behaviors for write-prepared transactions. @@ -30,7 +30,8 @@ ### Behavior changes * Enforce the existing contract of SingleDelete so that SingleDelete cannot be mixed with Delete because it leads to undefined behavior. Fix a number of unit tests that violate the contract but happen to pass. * ldb `--try_load_options` default to true if `--db` is specified and not creating a new DB, the user can still explicitly disable that by `--try_load_options=false` (or explicitly enable that by `--try_load_options`). -* During Flush write or Compaction write, the WriteController is used to determine whether DB writes are stalled or slowed down. The priority (Env::IOPriority) can then be determined accordingly and be passed in IOOptions to the file system. +* During Flush write or Compaction write/read, the WriteController is used to determine whether DB writes are stalled or slowed down. The priority (Env::IOPriority) can then be determined accordingly and be passed in IOOptions to the file system. + ## 7.2.0 (04/15/2022) ### Bug Fixes diff --git a/db/builder.cc b/db/builder.cc index 9af93a736..41b99dca4 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -336,6 +336,7 @@ Status BuildTable( // we will regrad this verification as user reads since the goal is // to cache it here for further user reads ReadOptions read_options; + read_options.rate_limiter_priority = Env::IO_USER; std::unique_ptr it(table_cache->NewIterator( read_options, file_options, tboptions.internal_comparator, *meta, nullptr /* range_del_agg */, mutable_cf_options.prefix_extractor, diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index d6451828a..36329496b 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -1350,7 +1350,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { ReadOptions read_options; read_options.verify_checksums = true; read_options.fill_cache = false; - read_options.rate_limiter_priority = Env::IO_LOW; + read_options.rate_limiter_priority = GetRateLimiterPriority(); // Compaction iterators shouldn't be confined to a single prefix. // Compactions use Seek() for // (a) concurrent compactions, diff --git a/db/db_rate_limiter_test.cc b/db/db_rate_limiter_test.cc index f30af1974..0c2bca224 100644 --- a/db/db_rate_limiter_test.cc +++ b/db/db_rate_limiter_test.cc @@ -116,9 +116,8 @@ TEST_P(DBRateLimiterOnReadTest, Get) { } Init(); - ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); - - int expected = 0; + // In Init(), compaction may request tokens for `Env::IO_USER`. + int64_t expected = options_.rate_limiter->GetTotalRequests(Env::IO_USER); for (int i = 0; i < kNumFiles; ++i) { { std::string value; @@ -146,7 +145,8 @@ TEST_P(DBRateLimiterOnReadTest, NewMultiGet) { } Init(); - ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); + // In Init(), compaction may request tokens for `Env::IO_USER`. + int64_t expected = options_.rate_limiter->GetTotalRequests(Env::IO_USER); const int kNumKeys = kNumFiles * kNumKeysPerFile; { @@ -166,7 +166,7 @@ TEST_P(DBRateLimiterOnReadTest, NewMultiGet) { ASSERT_TRUE(statuses[i].IsNotSupported()); } } - ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); + ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); } TEST_P(DBRateLimiterOnReadTest, OldMultiGet) { @@ -177,10 +177,10 @@ TEST_P(DBRateLimiterOnReadTest, OldMultiGet) { } Init(); - ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); + // In Init(), compaction may request tokens for `Env::IO_USER`. + int64_t expected = options_.rate_limiter->GetTotalRequests(Env::IO_USER); const int kNumKeys = kNumFiles * kNumKeysPerFile; - int expected = 0; { std::vector key_bufs; key_bufs.reserve(kNumKeys); @@ -207,10 +207,10 @@ TEST_P(DBRateLimiterOnReadTest, Iterator) { } Init(); + // In Init(), compaction may request tokens for `Env::IO_USER`. + int64_t expected = options_.rate_limiter->GetTotalRequests(Env::IO_USER); std::unique_ptr iter(db_->NewIterator(GetReadOptions())); - ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); - int expected = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { ++expected; ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); @@ -236,12 +236,12 @@ TEST_P(DBRateLimiterOnReadTest, VerifyChecksum) { return; } Init(); - - ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); + // In Init(), compaction may request tokens for `Env::IO_USER`. + int64_t expected = options_.rate_limiter->GetTotalRequests(Env::IO_USER); ASSERT_OK(db_->VerifyChecksum(GetReadOptions())); // The files are tiny so there should have just been one read per file. - int expected = kNumFiles; + expected += kNumFiles; ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); } @@ -251,11 +251,12 @@ TEST_P(DBRateLimiterOnReadTest, VerifyFileChecksums) { } Init(); - ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); + // In Init(), compaction may request tokens for `Env::IO_USER`. + int64_t expected = options_.rate_limiter->GetTotalRequests(Env::IO_USER); ASSERT_OK(db_->VerifyFileChecksums(GetReadOptions())); // The files are tiny so there should have just been one read per file. - int expected = kNumFiles; + expected += kNumFiles; ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); } diff --git a/db/db_test2.cc b/db/db_test2.cc index 8cde53f9c..143f5835c 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -3987,12 +3987,14 @@ TEST_F(DBTest2, RateLimitedCompactionReads) { // should be slightly above 512KB due to non-data blocks read. Arbitrarily // chose 1MB as the upper bound on the total bytes read. - size_t rate_limited_bytes = - options.rate_limiter->GetTotalBytesThrough(Env::IO_TOTAL); - // There must be no charges at non-`IO_LOW` priorities. + size_t rate_limited_bytes = static_cast( + options.rate_limiter->GetTotalBytesThrough(Env::IO_TOTAL)); + // The charges can exist for `IO_LOW` and `IO_USER` priorities. + size_t rate_limited_bytes_by_pri = + options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW) + + options.rate_limiter->GetTotalBytesThrough(Env::IO_USER); ASSERT_EQ(rate_limited_bytes, - static_cast( - options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW))); + static_cast(rate_limited_bytes_by_pri)); // Include the explicit prefetch of the footer in direct I/O case. size_t direct_io_extra = use_direct_io ? 512 * 1024 : 0; ASSERT_GE( @@ -4010,9 +4012,11 @@ TEST_F(DBTest2, RateLimitedCompactionReads) { } delete iter; // bytes read for user iterator shouldn't count against the rate limit. + rate_limited_bytes_by_pri = + options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW) + + options.rate_limiter->GetTotalBytesThrough(Env::IO_USER); ASSERT_EQ(rate_limited_bytes, - static_cast( - options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW))); + static_cast(rate_limited_bytes_by_pri)); } } } diff --git a/file/file_util.h b/file/file_util.h index f7900ba40..d46a7ba0e 100644 --- a/file/file_util.h +++ b/file/file_util.h @@ -78,6 +78,8 @@ inline IOStatus PrepareIOFromReadOptions(const ReadOptions& ro, (!opts.timeout.count() || ro.io_timeout < opts.timeout)) { opts.timeout = ro.io_timeout; } + + opts.rate_limiter_priority = ro.rate_limiter_priority; return IOStatus::OK(); } diff --git a/file/random_access_file_reader.h b/file/random_access_file_reader.h index 55c156f40..f3c24d1bd 100644 --- a/file/random_access_file_reader.h +++ b/file/random_access_file_reader.h @@ -172,8 +172,11 @@ class RandomAccessFileReader { size_t num_reqs, AlignedBuf* aligned_buf, Env::IOPriority rate_limiter_priority) const; - IOStatus Prefetch(uint64_t offset, size_t n) const { - return file_->Prefetch(offset, n, IOOptions(), nullptr); + IOStatus Prefetch(uint64_t offset, size_t n, + const Env::IOPriority rate_limiter_priority) const { + IOOptions opts; + opts.rate_limiter_priority = rate_limiter_priority; + return file_->Prefetch(offset, n, opts, nullptr); } FSRandomAccessFile* file() { return file_.get(); } diff --git a/table/block_based/binary_search_index_reader.cc b/table/block_based/binary_search_index_reader.cc index 12469eadb..21787cc1a 100644 --- a/table/block_based/binary_search_index_reader.cc +++ b/table/block_based/binary_search_index_reader.cc @@ -47,7 +47,8 @@ InternalIteratorBase* BinarySearchIndexReader::NewIterator( const bool no_io = (read_options.read_tier == kBlockCacheTier); CachableEntry index_block; const Status s = - GetOrReadIndexBlock(no_io, get_context, lookup_context, &index_block); + GetOrReadIndexBlock(no_io, read_options.rate_limiter_priority, + get_context, lookup_context, &index_block); if (!s.ok()) { if (iter != nullptr) { iter->Invalidate(s); diff --git a/table/block_based/block_based_table_iterator.cc b/table/block_based/block_based_table_iterator.cc index 75b595cc1..7daf47204 100644 --- a/table/block_based/block_based_table_iterator.cc +++ b/table/block_based/block_based_table_iterator.cc @@ -234,7 +234,7 @@ void BlockBasedTableIterator::InitDataBlock() { // Enabled from the very first IO when ReadOptions.readahead_size is set. block_prefetcher_.PrefetchIfNeeded( rep, data_block_handle, read_options_.readahead_size, is_for_compaction, - read_options_.async_io); + read_options_.async_io, read_options_.rate_limiter_priority); Status s; table_->NewDataBlockIterator( read_options_, data_block_handle, &block_iter_, BlockType::kData, diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 0ce8a008b..4d53032a4 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -561,7 +561,7 @@ Status BlockBasedTable::Open( Footer footer; std::unique_ptr prefetch_buffer; - // Only retain read_options.deadline and read_options.io_timeout. + // From read_options, retain deadline, io_timeout, and rate_limiter_priority. // In future, we may retain more // options. Specifically, w ignore verify_checksums and default to // checksum verification anyway when creating the index and filter @@ -569,6 +569,7 @@ Status BlockBasedTable::Open( ReadOptions ro; ro.deadline = read_options.deadline; ro.io_timeout = read_options.io_timeout; + ro.rate_limiter_priority = read_options.rate_limiter_priority; // prefetch both index and filters, down to all partitions const bool prefetch_all = prefetch_index_and_filter_in_cache || level == 0; @@ -766,7 +767,8 @@ Status BlockBasedTable::PrefetchTail( // Try file system prefetch if (!file->use_direct_io() && !force_direct_prefetch) { - if (!file->Prefetch(prefetch_off, prefetch_len).IsNotSupported()) { + if (!file->Prefetch(prefetch_off, prefetch_len, ro.rate_limiter_priority) + .IsNotSupported()) { prefetch_buffer->reset(new FilePrefetchBuffer( 0 /* readahead_size */, 0 /* max_readahead_size */, false /* enable */, true /* track_min_offset */)); @@ -778,6 +780,7 @@ Status BlockBasedTable::PrefetchTail( prefetch_buffer->reset( new FilePrefetchBuffer(0 /* readahead_size */, 0 /* max_readahead_size */, true /* enable */, true /* track_min_offset */)); + IOOptions opts; Status s = file->PrepareIOOptions(ro, opts); if (s.ok()) { diff --git a/table/block_based/block_prefetcher.cc b/table/block_based/block_prefetcher.cc index c4a637550..f7c7b94fe 100644 --- a/table/block_based/block_prefetcher.cc +++ b/table/block_based/block_prefetcher.cc @@ -8,13 +8,14 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "table/block_based/block_prefetcher.h" +#include "rocksdb/file_system.h" #include "table/block_based/block_based_table_reader.h" namespace ROCKSDB_NAMESPACE { -void BlockPrefetcher::PrefetchIfNeeded(const BlockBasedTable::Rep* rep, - const BlockHandle& handle, - size_t readahead_size, - bool is_for_compaction, bool async_io) { +void BlockPrefetcher::PrefetchIfNeeded( + const BlockBasedTable::Rep* rep, const BlockHandle& handle, + const size_t readahead_size, bool is_for_compaction, const bool async_io, + const Env::IOPriority rate_limiter_priority) { if (is_for_compaction) { rep->CreateFilePrefetchBufferIfNotExists( compaction_readahead_size_, compaction_readahead_size_, @@ -84,7 +85,8 @@ void BlockPrefetcher::PrefetchIfNeeded(const BlockBasedTable::Rep* rep, // we can fallback to reading from disk if Prefetch fails. Status s = rep->file->Prefetch( handle.offset(), - BlockBasedTable::BlockSizeWithTrailer(handle) + readahead_size_); + BlockBasedTable::BlockSizeWithTrailer(handle) + readahead_size_, + rate_limiter_priority); if (s.IsNotSupported()) { rep->CreateFilePrefetchBufferIfNotExists(initial_auto_readahead_size_, max_auto_readahead_size, diff --git a/table/block_based/block_prefetcher.h b/table/block_based/block_prefetcher.h index e7c11532a..285903511 100644 --- a/table/block_based/block_prefetcher.h +++ b/table/block_based/block_prefetcher.h @@ -20,7 +20,8 @@ class BlockPrefetcher { void PrefetchIfNeeded(const BlockBasedTable::Rep* rep, const BlockHandle& handle, size_t readahead_size, - bool is_for_compaction, bool async_io); + bool is_for_compaction, bool async_io, + Env::IOPriority rate_limiter_priority); FilePrefetchBuffer* prefetch_buffer() { return prefetch_buffer_.get(); } void UpdateReadPattern(const uint64_t& offset, const size_t& len) { diff --git a/table/block_based/hash_index_reader.cc b/table/block_based/hash_index_reader.cc index 4d8544161..839c79dd9 100644 --- a/table/block_based/hash_index_reader.cc +++ b/table/block_based/hash_index_reader.cc @@ -117,7 +117,8 @@ InternalIteratorBase* HashIndexReader::NewIterator( const bool no_io = (read_options.read_tier == kBlockCacheTier); CachableEntry index_block; const Status s = - GetOrReadIndexBlock(no_io, get_context, lookup_context, &index_block); + GetOrReadIndexBlock(no_io, read_options.rate_limiter_priority, + get_context, lookup_context, &index_block); if (!s.ok()) { if (iter != nullptr) { iter->Invalidate(s); diff --git a/table/block_based/index_reader_common.cc b/table/block_based/index_reader_common.cc index 275ae56dc..58fdfe4b6 100644 --- a/table/block_based/index_reader_common.cc +++ b/table/block_based/index_reader_common.cc @@ -33,7 +33,7 @@ Status BlockBasedTable::IndexReaderCommon::ReadIndexBlock( } Status BlockBasedTable::IndexReaderCommon::GetOrReadIndexBlock( - bool no_io, GetContext* get_context, + bool no_io, Env::IOPriority rate_limiter_priority, GetContext* get_context, BlockCacheLookupContext* lookup_context, CachableEntry* index_block) const { assert(index_block != nullptr); @@ -44,6 +44,7 @@ Status BlockBasedTable::IndexReaderCommon::GetOrReadIndexBlock( } ReadOptions read_options; + read_options.rate_limiter_priority = rate_limiter_priority; if (no_io) { read_options.read_tier = kBlockCacheTier; } diff --git a/table/block_based/index_reader_common.h b/table/block_based/index_reader_common.h index 71174a7d3..a0f268ad8 100644 --- a/table/block_based/index_reader_common.h +++ b/table/block_based/index_reader_common.h @@ -66,7 +66,8 @@ class BlockBasedTable::IndexReaderCommon : public BlockBasedTable::IndexReader { return table_->get_rep()->table_options.cache_index_and_filter_blocks; } - Status GetOrReadIndexBlock(bool no_io, GetContext* get_context, + Status GetOrReadIndexBlock(bool no_io, Env::IOPriority rate_limiter_priority, + GetContext* get_context, BlockCacheLookupContext* lookup_context, CachableEntry* index_block) const; diff --git a/table/block_based/partitioned_index_iterator.cc b/table/block_based/partitioned_index_iterator.cc index 15a3aac87..3e93787a9 100644 --- a/table/block_based/partitioned_index_iterator.cc +++ b/table/block_based/partitioned_index_iterator.cc @@ -91,7 +91,8 @@ void PartitionedIndexIterator::InitPartitionedIndexBlock() { // Enabled from the very first IO when ReadOptions.readahead_size is set. block_prefetcher_.PrefetchIfNeeded( rep, partitioned_index_handle, read_options_.readahead_size, - is_for_compaction, read_options_.async_io); + is_for_compaction, read_options_.async_io, + read_options_.rate_limiter_priority); Status s; table_->NewDataBlockIterator( read_options_, partitioned_index_handle, &block_iter_, diff --git a/table/block_based/partitioned_index_reader.cc b/table/block_based/partitioned_index_reader.cc index 25ea1a3a4..13f732ab7 100644 --- a/table/block_based/partitioned_index_reader.cc +++ b/table/block_based/partitioned_index_reader.cc @@ -49,7 +49,8 @@ InternalIteratorBase* PartitionIndexReader::NewIterator( const bool no_io = (read_options.read_tier == kBlockCacheTier); CachableEntry index_block; const Status s = - GetOrReadIndexBlock(no_io, get_context, lookup_context, &index_block); + GetOrReadIndexBlock(no_io, read_options.rate_limiter_priority, + get_context, lookup_context, &index_block); if (!s.ok()) { if (iter != nullptr) { iter->Invalidate(s); @@ -82,6 +83,7 @@ InternalIteratorBase* PartitionIndexReader::NewIterator( ro.io_timeout = read_options.io_timeout; ro.adaptive_readahead = read_options.adaptive_readahead; ro.async_io = read_options.async_io; + ro.rate_limiter_priority = read_options.rate_limiter_priority; // We don't return pinned data from index blocks, so no need // to set `block_contents_pinned`. @@ -119,8 +121,9 @@ Status PartitionIndexReader::CacheDependencies(const ReadOptions& ro, CachableEntry index_block; { - Status s = GetOrReadIndexBlock(false /* no_io */, nullptr /* get_context */, - &lookup_context, &index_block); + Status s = GetOrReadIndexBlock(false /* no_io */, ro.rate_limiter_priority, + nullptr /* get_context */, &lookup_context, + &index_block); if (!s.ok()) { return s; } diff --git a/table/format.cc b/table/format.cc index e7720e901..eb42bd1fc 100644 --- a/table/format.cc +++ b/table/format.cc @@ -372,17 +372,17 @@ Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file, // TODO: rate limit footer reads. if (prefetch_buffer == nullptr || !prefetch_buffer->TryReadFromCache( - IOOptions(), file, read_offset, Footer::kMaxEncodedLength, - &footer_input, nullptr, Env::IO_TOTAL /* rate_limiter_priority */)) { + opts, file, read_offset, Footer::kMaxEncodedLength, &footer_input, + nullptr, opts.rate_limiter_priority)) { if (file->use_direct_io()) { s = file->Read(opts, read_offset, Footer::kMaxEncodedLength, &footer_input, nullptr, &internal_buf, - Env::IO_TOTAL /* rate_limiter_priority */); + opts.rate_limiter_priority); } else { footer_buf.reserve(Footer::kMaxEncodedLength); s = file->Read(opts, read_offset, Footer::kMaxEncodedLength, &footer_input, &footer_buf[0], nullptr, - Env::IO_TOTAL /* rate_limiter_priority */); + opts.rate_limiter_priority); } if (!s.ok()) return s; }