diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index d68d97960..48a668f30 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -2285,7 +2285,8 @@ Status CompactionJob::OpenCompactionOutputFile( /*enable_hash=*/paranoid_file_checks_); } - writable_file->SetIOPriority(Env::IOPriority::IO_LOW); + writable_file->SetIOPriority( + GetRateLimiterPriority(RateLimiter::OpType::kWrite)); writable_file->SetWriteLifeTimeHint(write_hint_); FileTypeSet tmp_set = db_options_.checksum_handoff_file_types; writable_file->SetPreallocationBlockSize(static_cast( @@ -2476,6 +2477,30 @@ std::string CompactionJob::GetTableFileName(uint64_t file_number) { file_number, compact_->compaction->output_path_id()); } +Env::IOPriority CompactionJob::GetRateLimiterPriority( + const RateLimiter::OpType op_type) { + WriteController* write_controller = + versions_->GetColumnFamilySet()->write_controller(); + // TODO: if all priorities are the same for read and write, update this. + if (op_type == RateLimiter::OpType::kWrite) { + if (write_controller->NeedsDelay() || write_controller->IsStopped()) { + return Env::IO_USER; + } else if (write_controller->NeedSpeedupCompaction()) { + return Env::IO_HIGH; + } + + return Env::IO_LOW; + } else { + if (write_controller->NeedsDelay()|| (write_controller->IsStopped()) { + return Env::IO_USER; + } else if (write_controller->NeedSpeedupCompaction()) { + return Env::IO_HIGH; + } + + return Env::IO_LOW; + } +} + #ifndef ROCKSDB_LITE std::string CompactionServiceCompactionJob::GetTableFileName( uint64_t file_number) { diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 24a77c679..98d35140a 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -234,6 +234,8 @@ class CompactionJob { // Get table file name in where it's outputting to, which should also be in // `output_directory_`. virtual std::string GetTableFileName(uint64_t file_number); + // The rate limiter priority (io_priority) is determined dynamically here. + Env::IOPriority GetRateLimiterPriority(const RateLimiter::OpType op_type); }; // CompactionServiceInput is used the pass compaction information between two diff --git a/db/flush_job.cc b/db/flush_job.cc index 66e198f74..db1f401fd 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -809,6 +809,10 @@ Status FlushJob::WriteLevel0Table() { { auto write_hint = cfd_->CalculateSSTWriteHint(0); + // https://fburl.com/code/zxh9nif4 : All of WriteController functions + // are to be called while holding DB mutex. + Env::IOPriority io_priority = + GetRateLimiterPriority(RateLimiter::OpType::kWrite); db_mutex_->Unlock(); if (log_buffer_) { log_buffer_->FlushBufferToLog(); @@ -924,7 +928,7 @@ Status FlushJob::WriteLevel0Table() { snapshot_checker_, mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), &io_s, io_tracer_, BlobFileCreationReason::kFlush, event_logger_, job_context_->job_id, - Env::IO_HIGH, &table_properties_, write_hint, full_history_ts_low, + io_priority, &table_properties_, write_hint, full_history_ts_low, blob_callback_, &num_input_entries, &memtable_payload_bytes, &memtable_garbage_bytes); // TODO: Cleanup io_status in BuildTable and table builders @@ -1064,6 +1068,22 @@ std::unique_ptr FlushJob::GetFlushJobInfo() const { return info; } +Env::IOPriority FlushJob::GetRateLimiterPriority( + const RateLimiter::OpType op_type) { + WriteController* write_controller = + versions_->GetColumnFamilySet()->write_controller(); + + if (op_type == RateLimiter::OpType::kWrite) { + if (write_controller->IsStopped() || write_controller->NeedsDelay()) { + return Env::IO_USER; + } + return Env::IO_HIGH; + } else { + return Env::IO_USER; + } +} +} + #endif // !ROCKSDB_LITE } // namespace ROCKSDB_NAMESPACE diff --git a/db/flush_job.h b/db/flush_job.h index 76d5e34b6..d0e77dff3 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -123,6 +123,8 @@ class FlushJob { bool MemPurgeDecider(); #ifndef ROCKSDB_LITE std::unique_ptr GetFlushJobInfo() const; + // The rate limiter priority (io_priority) is determined dynamically here. + Env::IOPriority GetRateLimiterPriority(const RateLimiter::OpType op_type); #endif // !ROCKSDB_LITE const std::string& dbname_; diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index e326db9e0..d71c0dfd6 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -658,6 +658,15 @@ TEST_F(FlushJobTimestampTest, NoKeyExpired) { ASSERT_TRUE(to_delete.empty()); } +TEST_F(FlushJobTimestampTest, GetRateLimiterPriority) { + // When the state from WriteController is normal. + ASSERT_EQ(GetRateLimiterPriority(RateLimiter::OpType::kWrite), Env::IO_HIGH); + + // When the state from WriteController is Delayed. + + // When the state from WriteController is Stopped. +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/write_controller.h b/db/write_controller.h index 88bd1417f..c32b70b94 100644 --- a/db/write_controller.h +++ b/db/write_controller.h @@ -52,7 +52,7 @@ class WriteController { bool IsStopped() const; bool NeedsDelay() const { return total_delayed_.load() > 0; } bool NeedSpeedupCompaction() const { - return IsStopped() || NeedsDelay() || total_compaction_pressure_ > 0; + return IsStopped() || NeedsDelay() || total_compaction_pressure_.load() > 0; } // return how many microseconds the caller needs to sleep after the call // num_bytes: how many number of bytes to put into the DB. diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc index 84be9b689..c96b31d87 100644 --- a/file/writable_file_writer.cc +++ b/file/writable_file_writer.cc @@ -54,10 +54,12 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum, UpdateFileChecksum(data); { + IOOptions io_options; + UpdateIOOptionsIfNeeded(io_options, op_rate_limiter_priority); IOSTATS_TIMER_GUARD(prepare_write_nanos); TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite"); writable_file_->PrepareWrite(static_cast(GetFileSize()), left, - IOOptions(), nullptr); + io_options, nullptr); } // See whether we need to enlarge the buffer to avoid the flush @@ -211,6 +213,8 @@ IOStatus WritableFileWriter::Close() { s = Flush(); // flush cache to OS IOStatus interim; + IOOptions io_options; + io_options.rate_limiter_priority = writable_file_->GetIOPriority(); // In direct I/O mode we write whole pages so // we need to let the file know where data ends. if (use_direct_io()) { @@ -221,7 +225,7 @@ IOStatus WritableFileWriter::Close() { start_ts = FileOperationInfo::StartNow(); } #endif - interim = writable_file_->Truncate(filesize_, IOOptions(), nullptr); + interim = writable_file_->Truncate(filesize_, io_options, nullptr); #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { auto finish_ts = FileOperationInfo::FinishNow(); @@ -241,7 +245,7 @@ IOStatus WritableFileWriter::Close() { start_ts = FileOperationInfo::StartNow(); } #endif - interim = writable_file_->Fsync(IOOptions(), nullptr); + interim = writable_file_->Fsync(io_options, nullptr); #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { auto finish_ts = FileOperationInfo::FinishNow(); @@ -267,7 +271,7 @@ IOStatus WritableFileWriter::Close() { start_ts = FileOperationInfo::StartNow(); } #endif - interim = writable_file_->Close(IOOptions(), nullptr); + interim = writable_file_->Close(io_options, nullptr); #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { auto finish_ts = FileOperationInfo::FinishNow(); @@ -331,7 +335,9 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) { start_ts = FileOperationInfo::StartNow(); } #endif - s = writable_file_->Flush(IOOptions(), nullptr); + IOOptions io_options; + UpdateIOOptionsIfNeeded(io_options, op_rate_limiter_priority); + s = writable_file_->Flush(io_options, nullptr); #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { auto finish_ts = std::chrono::steady_clock::now(); @@ -435,10 +441,12 @@ IOStatus WritableFileWriter::SyncInternal(bool use_fsync) { start_ts = FileOperationInfo::StartNow(); } #endif + IOOptions io_options; + io_options.rate_limiter_priority = writable_file_->GetIOPriority(); if (use_fsync) { - s = writable_file_->Fsync(IOOptions(), nullptr); + s = writable_file_->Fsync(io_options, nullptr); } else { - s = writable_file_->Sync(IOOptions(), nullptr); + s = writable_file_->Sync(io_options, nullptr); } #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { @@ -466,7 +474,9 @@ IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { start_ts = FileOperationInfo::StartNow(); } #endif - IOStatus s = writable_file_->RangeSync(offset, nbytes, IOOptions(), nullptr); + IOOptions io_options; + io_options.rate_limiter_priority = writable_file_->GetIOPriority(); + IOStatus s = writable_file_->RangeSync(offset, nbytes, io_options, nullptr); #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { auto finish_ts = std::chrono::steady_clock::now(); @@ -490,19 +500,21 @@ IOStatus WritableFileWriter::WriteBuffered( size_t left = size; DataVerificationInfo v_info; char checksum_buf[sizeof(uint32_t)]; + IOOptions io_options; + Env::IOPriority rate_limiter_priority_used = + WritableFileWriter::DecideRateLimiterPriority( + writable_file_->GetIOPriority(), op_rate_limiter_priority); while (left > 0) { - size_t allowed; - Env::IOPriority rate_limiter_priority_used = - WritableFileWriter::DecideRateLimiterPriority( - writable_file_->GetIOPriority(), op_rate_limiter_priority); - if (rate_limiter_ != nullptr && - rate_limiter_priority_used != Env::IO_TOTAL) { - allowed = rate_limiter_->RequestToken(left, 0 /* alignment */, - rate_limiter_priority_used, stats_, - RateLimiter::OpType::kWrite); - } else { - allowed = left; + size_t allowed = left; + if (rate_limiter_priority_used != Env::IO_TOTAL) { + if (rate_limiter_ != nullptr) { + allowed = rate_limiter_->RequestToken( + left, 0 /* alignment */, rate_limiter_priority_used, stats_, + RateLimiter::OpType::kWrite); + } else { + io_options.rate_limiter_priority = rate_limiter_priority_used; + } } { @@ -511,7 +523,7 @@ IOStatus WritableFileWriter::WriteBuffered( #ifndef ROCKSDB_LITE FileOperationInfo::StartTimePoint start_ts; - uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr); + uint64_t old_size = writable_file_->GetFileSize(io_options, nullptr); if (ShouldNotifyListeners()) { start_ts = FileOperationInfo::StartNow(); old_size = next_write_offset_; @@ -524,10 +536,10 @@ IOStatus WritableFileWriter::WriteBuffered( if (perform_data_verification_) { Crc32cHandoffChecksumCalculation(src, allowed, checksum_buf); v_info.checksum = Slice(checksum_buf, sizeof(uint32_t)); - s = writable_file_->Append(Slice(src, allowed), IOOptions(), v_info, + s = writable_file_->Append(Slice(src, allowed), io_options, v_info, nullptr); } else { - s = writable_file_->Append(Slice(src, allowed), IOOptions(), nullptr); + s = writable_file_->Append(Slice(src, allowed), io_options, nullptr); } if (!s.ok()) { // If writable_file_->Append() failed, then the data may or may not @@ -579,6 +591,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum( size_t left = size; DataVerificationInfo v_info; char checksum_buf[sizeof(uint32_t)]; + IOOptions io_options; // Check how much is allowed. Here, we loop until the rate limiter allows to // write the entire buffer. @@ -588,13 +601,17 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum( Env::IOPriority rate_limiter_priority_used = WritableFileWriter::DecideRateLimiterPriority( writable_file_->GetIOPriority(), op_rate_limiter_priority); - if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) { - while (data_size > 0) { - size_t tmp_size; - tmp_size = rate_limiter_->RequestToken(data_size, buf_.Alignment(), - rate_limiter_priority_used, stats_, - RateLimiter::OpType::kWrite); - data_size -= tmp_size; + if (rate_limiter_priority_used != Env::IO_TOTAL) { + if (rate_limiter_ != nullptr) { + while (data_size > 0) { + size_t tmp_size; + tmp_size = rate_limiter_->RequestToken( + data_size, buf_.Alignment(), rate_limiter_priority_used, stats_, + RateLimiter::OpType::kWrite); + data_size -= tmp_size; + } + } else { + io_options.rate_limiter_priority = rate_limiter_priority_used; } } @@ -604,7 +621,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum( #ifndef ROCKSDB_LITE FileOperationInfo::StartTimePoint start_ts; - uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr); + uint64_t old_size = writable_file_->GetFileSize(io_options, nullptr); if (ShouldNotifyListeners()) { start_ts = FileOperationInfo::StartNow(); old_size = next_write_offset_; @@ -617,8 +634,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum( EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_); v_info.checksum = Slice(checksum_buf, sizeof(uint32_t)); - s = writable_file_->Append(Slice(src, left), IOOptions(), v_info, - nullptr); + s = writable_file_->Append(Slice(src, left), io_options, v_info, nullptr); SetPerfLevel(prev_perf_level); } #ifndef ROCKSDB_LITE @@ -709,20 +725,22 @@ IOStatus WritableFileWriter::WriteDirect( size_t left = buf_.CurrentSize(); DataVerificationInfo v_info; char checksum_buf[sizeof(uint32_t)]; + IOOptions io_options; + Env::IOPriority rate_limiter_priority_used = + WritableFileWriter::DecideRateLimiterPriority( + writable_file_->GetIOPriority(), op_rate_limiter_priority); while (left > 0) { // Check how much is allowed - size_t size; - Env::IOPriority rate_limiter_priority_used = - WritableFileWriter::DecideRateLimiterPriority( - writable_file_->GetIOPriority(), op_rate_limiter_priority); - if (rate_limiter_ != nullptr && - rate_limiter_priority_used != Env::IO_TOTAL) { - size = rate_limiter_->RequestToken(left, buf_.Alignment(), - writable_file_->GetIOPriority(), - stats_, RateLimiter::OpType::kWrite); - } else { - size = left; + size_t size = left; + if (rate_limiter_priority_used != Env::IO_TOTAL) { + if (rate_limiter_ != nullptr) { + size = rate_limiter_->RequestToken(left, buf_.Alignment(), + writable_file_->GetIOPriority(), + stats_, RateLimiter::OpType::kWrite); + } else { + io_options.rate_limiter_priority = rate_limiter_priority_used; + } } { @@ -737,10 +755,10 @@ IOStatus WritableFileWriter::WriteDirect( Crc32cHandoffChecksumCalculation(src, size, checksum_buf); v_info.checksum = Slice(checksum_buf, sizeof(uint32_t)); s = writable_file_->PositionedAppend(Slice(src, size), write_offset, - IOOptions(), v_info, nullptr); + io_options, v_info, nullptr); } else { s = writable_file_->PositionedAppend(Slice(src, size), write_offset, - IOOptions(), nullptr); + io_options, nullptr); } if (ShouldNotifyListeners()) { @@ -809,6 +827,7 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum( size_t left = buf_.CurrentSize(); DataVerificationInfo v_info; char checksum_buf[sizeof(uint32_t)]; + IOOptions io_options; // Check how much is allowed. Here, we loop until the rate limiter allows to // write the entire buffer. @@ -818,13 +837,16 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum( Env::IOPriority rate_limiter_priority_used = WritableFileWriter::DecideRateLimiterPriority( writable_file_->GetIOPriority(), op_rate_limiter_priority); - if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) { - while (data_size > 0) { + + if (rate_limiter_priority_used != Env::IO_TOTAL) { + if (rate_limiter_ != nullptr) { size_t size; size = rate_limiter_->RequestToken(data_size, buf_.Alignment(), writable_file_->GetIOPriority(), stats_, RateLimiter::OpType::kWrite); data_size -= size; + } else { + io_options.rate_limiter_priority = rate_limiter_priority_used; } } @@ -839,7 +861,7 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum( EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_); v_info.checksum = Slice(checksum_buf, sizeof(uint32_t)); s = writable_file_->PositionedAppend(Slice(src, left), write_offset, - IOOptions(), v_info, nullptr); + io_options, v_info, nullptr); if (ShouldNotifyListeners()) { auto finish_ts = std::chrono::steady_clock::now(); @@ -894,4 +916,14 @@ Env::IOPriority WritableFileWriter::DecideRateLimiterPriority( return op_rate_limiter_priority; } } + +void WritableFileWriter::UpdateIOOptionsIfNeeded( + IOOptions& io_options, const Env::IOPriority op_rate_limiter_priority) { + Env::IOPriority rate_limiter_priority_used = + WritableFileWriter::DecideRateLimiterPriority( + writable_file_->GetIOPriority(), op_rate_limiter_priority); + if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) { + io_options.rate_limiter_priority = rate_limiter_priority_used; + } +} } // namespace ROCKSDB_NAMESPACE diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h index f9f6e5bd0..ee2bb5e6c 100644 --- a/file/writable_file_writer.h +++ b/file/writable_file_writer.h @@ -281,6 +281,10 @@ class WritableFileWriter { Env::IOPriority writable_file_io_priority, Env::IOPriority op_rate_limiter_priority); + // If rate_limiter_ is nullptr, the rate_limiter_priority may be updated. + void UpdateIOOptionsIfNeeded(IOOptions& io_options, + const Env::IOPriority op_rate_limiter_priority); + // Used when os buffering is OFF and we are writing // DMA such as in Direct I/O mode #ifndef ROCKSDB_LITE