[WIP] Set rate limiter priority dynamically and pass it to FS if needed

This commit is contained in:
bowang 2022-05-12 14:07:15 -07:00
parent e96e8e2d05
commit c19ac641ea
6 changed files with 134 additions and 50 deletions

View File

@ -2285,7 +2285,8 @@ Status CompactionJob::OpenCompactionOutputFile(
/*enable_hash=*/paranoid_file_checks_); /*enable_hash=*/paranoid_file_checks_);
} }
writable_file->SetIOPriority(Env::IOPriority::IO_LOW); writable_file->SetIOPriority(
GetRateLimiterPriority(RateLimiter::OpType::kWrite));
writable_file->SetWriteLifeTimeHint(write_hint_); writable_file->SetWriteLifeTimeHint(write_hint_);
FileTypeSet tmp_set = db_options_.checksum_handoff_file_types; FileTypeSet tmp_set = db_options_.checksum_handoff_file_types;
writable_file->SetPreallocationBlockSize(static_cast<size_t>( writable_file->SetPreallocationBlockSize(static_cast<size_t>(
@ -2476,6 +2477,27 @@ std::string CompactionJob::GetTableFileName(uint64_t file_number) {
file_number, compact_->compaction->output_path_id()); file_number, compact_->compaction->output_path_id());
} }
Env::IOPriority CompactionJob::GetRateLimiterPriority(
const RateLimiter::OpType op_type) {
WriteController* write_controller =
versions_->GetColumnFamilySet()->write_controller();
if (op_type == RateLimiter::OpType::kWrite) {
if (UNLIKELY(write_controller->NeedsDelay())) {
return Env::IO_MID;
} else if (UNLIKELY(write_controller->IsStopped())) {
return Env::IO_USER;
}
return Env::IO_LOW;
} else {
if (UNLIKELY(write_controller->NeedsDelay())) {
return Env::IO_MID;
} else if (UNLIKELY(write_controller->IsStopped())) {
return Env::IO_USER;
}
return Env::IO_LOW;
}
}
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
std::string CompactionServiceCompactionJob::GetTableFileName( std::string CompactionServiceCompactionJob::GetTableFileName(
uint64_t file_number) { uint64_t file_number) {

View File

@ -234,6 +234,8 @@ class CompactionJob {
// Get table file name in where it's outputting to, which should also be in // Get table file name in where it's outputting to, which should also be in
// `output_directory_`. // `output_directory_`.
virtual std::string GetTableFileName(uint64_t file_number); virtual std::string GetTableFileName(uint64_t file_number);
// The rate limiter priority (io_priority) is determined dynamically here.
Env::IOPriority GetRateLimiterPriority(const RateLimiter::OpType op_type);
}; };
// CompactionServiceInput is used the pass compaction information between two // CompactionServiceInput is used the pass compaction information between two

View File

@ -809,6 +809,10 @@ Status FlushJob::WriteLevel0Table() {
{ {
auto write_hint = cfd_->CalculateSSTWriteHint(0); auto write_hint = cfd_->CalculateSSTWriteHint(0);
// https://fburl.com/code/zxh9nif4 : All of WriteController functions
// are to be called while holding DB mutex.
Env::IOPriority io_priority =
GetRateLimiterPriority(RateLimiter::OpType::kWrite);
db_mutex_->Unlock(); db_mutex_->Unlock();
if (log_buffer_) { if (log_buffer_) {
log_buffer_->FlushBufferToLog(); log_buffer_->FlushBufferToLog();
@ -924,7 +928,7 @@ Status FlushJob::WriteLevel0Table() {
snapshot_checker_, mutable_cf_options_.paranoid_file_checks, snapshot_checker_, mutable_cf_options_.paranoid_file_checks,
cfd_->internal_stats(), &io_s, io_tracer_, cfd_->internal_stats(), &io_s, io_tracer_,
BlobFileCreationReason::kFlush, event_logger_, job_context_->job_id, BlobFileCreationReason::kFlush, event_logger_, job_context_->job_id,
Env::IO_HIGH, &table_properties_, write_hint, full_history_ts_low, io_priority, &table_properties_, write_hint, full_history_ts_low,
blob_callback_, &num_input_entries, &memtable_payload_bytes, blob_callback_, &num_input_entries, &memtable_payload_bytes,
&memtable_garbage_bytes); &memtable_garbage_bytes);
// TODO: Cleanup io_status in BuildTable and table builders // TODO: Cleanup io_status in BuildTable and table builders
@ -1064,6 +1068,23 @@ std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
return info; return info;
} }
Env::IOPriority FlushJob::GetRateLimiterPriority(
const RateLimiter::OpType op_type) {
WriteController* write_controller =
versions_->GetColumnFamilySet()->write_controller();
if (op_type == RateLimiter::OpType::kWrite) {
if (UNLIKELY(write_controller->IsStopped()) ||
UNLIKELY(write_controller->NeedsDelay())) {
return Env::IO_USER;
}
return Env::IO_HIGH;
} else {
return Env::IO_USER;
}
}
}
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

View File

@ -123,6 +123,8 @@ class FlushJob {
bool MemPurgeDecider(); bool MemPurgeDecider();
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
std::unique_ptr<FlushJobInfo> GetFlushJobInfo() const; std::unique_ptr<FlushJobInfo> GetFlushJobInfo() const;
// The rate limiter priority (io_priority) is determined dynamically here.
Env::IOPriority GetRateLimiterPriority(const RateLimiter::OpType op_type);
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
const std::string& dbname_; const std::string& dbname_;

View File

@ -54,10 +54,12 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum,
UpdateFileChecksum(data); UpdateFileChecksum(data);
{ {
std::unique_ptr<IOOptions> io_options;
UpdateIOOptionsIfNeeded(io_options, op_rate_limiter_priority);
IOSTATS_TIMER_GUARD(prepare_write_nanos); IOSTATS_TIMER_GUARD(prepare_write_nanos);
TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite"); TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite");
writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left, writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left,
IOOptions(), nullptr); *io_options, nullptr);
} }
// See whether we need to enlarge the buffer to avoid the flush // See whether we need to enlarge the buffer to avoid the flush
@ -211,6 +213,8 @@ IOStatus WritableFileWriter::Close() {
s = Flush(); // flush cache to OS s = Flush(); // flush cache to OS
IOStatus interim; IOStatus interim;
IOOptions io_options;
io_options.rate_limiter_priority = writable_file_->GetIOPriority();
// In direct I/O mode we write whole pages so // In direct I/O mode we write whole pages so
// we need to let the file know where data ends. // we need to let the file know where data ends.
if (use_direct_io()) { if (use_direct_io()) {
@ -221,7 +225,7 @@ IOStatus WritableFileWriter::Close() {
start_ts = FileOperationInfo::StartNow(); start_ts = FileOperationInfo::StartNow();
} }
#endif #endif
interim = writable_file_->Truncate(filesize_, IOOptions(), nullptr); interim = writable_file_->Truncate(filesize_, io_options, nullptr);
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) { if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow(); auto finish_ts = FileOperationInfo::FinishNow();
@ -241,7 +245,7 @@ IOStatus WritableFileWriter::Close() {
start_ts = FileOperationInfo::StartNow(); start_ts = FileOperationInfo::StartNow();
} }
#endif #endif
interim = writable_file_->Fsync(IOOptions(), nullptr); interim = writable_file_->Fsync(io_options, nullptr);
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) { if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow(); auto finish_ts = FileOperationInfo::FinishNow();
@ -267,7 +271,7 @@ IOStatus WritableFileWriter::Close() {
start_ts = FileOperationInfo::StartNow(); start_ts = FileOperationInfo::StartNow();
} }
#endif #endif
interim = writable_file_->Close(IOOptions(), nullptr); interim = writable_file_->Close(io_options, nullptr);
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) { if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow(); auto finish_ts = FileOperationInfo::FinishNow();
@ -331,7 +335,9 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) {
start_ts = FileOperationInfo::StartNow(); start_ts = FileOperationInfo::StartNow();
} }
#endif #endif
s = writable_file_->Flush(IOOptions(), nullptr); std::unique_ptr<IOOptions> io_options;
UpdateIOOptionsIfNeeded(io_options, op_rate_limiter_priority);
s = writable_file_->Flush(*io_options, nullptr);
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) { if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::steady_clock::now(); auto finish_ts = std::chrono::steady_clock::now();
@ -435,10 +441,12 @@ IOStatus WritableFileWriter::SyncInternal(bool use_fsync) {
start_ts = FileOperationInfo::StartNow(); start_ts = FileOperationInfo::StartNow();
} }
#endif #endif
IOOptions io_options;
io_options.rate_limiter_priority = writable_file_->GetIOPriority();
if (use_fsync) { if (use_fsync) {
s = writable_file_->Fsync(IOOptions(), nullptr); s = writable_file_->Fsync(io_options, nullptr);
} else { } else {
s = writable_file_->Sync(IOOptions(), nullptr); s = writable_file_->Sync(io_options, nullptr);
} }
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) { if (ShouldNotifyListeners()) {
@ -466,7 +474,9 @@ IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
start_ts = FileOperationInfo::StartNow(); start_ts = FileOperationInfo::StartNow();
} }
#endif #endif
IOStatus s = writable_file_->RangeSync(offset, nbytes, IOOptions(), nullptr); IOOptions io_options;
io_options.rate_limiter_priority = writable_file_->GetIOPriority();
IOStatus s = writable_file_->RangeSync(offset, nbytes, io_options, nullptr);
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) { if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::steady_clock::now(); auto finish_ts = std::chrono::steady_clock::now();
@ -490,19 +500,21 @@ IOStatus WritableFileWriter::WriteBuffered(
size_t left = size; size_t left = size;
DataVerificationInfo v_info; DataVerificationInfo v_info;
char checksum_buf[sizeof(uint32_t)]; char checksum_buf[sizeof(uint32_t)];
IOOptions io_options;
Env::IOPriority rate_limiter_priority_used =
WritableFileWriter::DecideRateLimiterPriority(
writable_file_->GetIOPriority(), op_rate_limiter_priority);
while (left > 0) { while (left > 0) {
size_t allowed; size_t allowed = left;
Env::IOPriority rate_limiter_priority_used = if (rate_limiter_priority_used != Env::IO_TOTAL) {
WritableFileWriter::DecideRateLimiterPriority( if (rate_limiter_ != nullptr) {
writable_file_->GetIOPriority(), op_rate_limiter_priority); allowed = rate_limiter_->RequestToken(
if (rate_limiter_ != nullptr && left, 0 /* alignment */, rate_limiter_priority_used, stats_,
rate_limiter_priority_used != Env::IO_TOTAL) { RateLimiter::OpType::kWrite);
allowed = rate_limiter_->RequestToken(left, 0 /* alignment */, } else {
rate_limiter_priority_used, stats_, io_options.rate_limiter_priority = rate_limiter_priority_used;
RateLimiter::OpType::kWrite); }
} else {
allowed = left;
} }
{ {
@ -511,7 +523,7 @@ IOStatus WritableFileWriter::WriteBuffered(
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
FileOperationInfo::StartTimePoint start_ts; FileOperationInfo::StartTimePoint start_ts;
uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr); uint64_t old_size = writable_file_->GetFileSize(io_options, nullptr);
if (ShouldNotifyListeners()) { if (ShouldNotifyListeners()) {
start_ts = FileOperationInfo::StartNow(); start_ts = FileOperationInfo::StartNow();
old_size = next_write_offset_; old_size = next_write_offset_;
@ -524,10 +536,10 @@ IOStatus WritableFileWriter::WriteBuffered(
if (perform_data_verification_) { if (perform_data_verification_) {
Crc32cHandoffChecksumCalculation(src, allowed, checksum_buf); Crc32cHandoffChecksumCalculation(src, allowed, checksum_buf);
v_info.checksum = Slice(checksum_buf, sizeof(uint32_t)); v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
s = writable_file_->Append(Slice(src, allowed), IOOptions(), v_info, s = writable_file_->Append(Slice(src, allowed), io_options, v_info,
nullptr); nullptr);
} else { } else {
s = writable_file_->Append(Slice(src, allowed), IOOptions(), nullptr); s = writable_file_->Append(Slice(src, allowed), io_options, nullptr);
} }
if (!s.ok()) { if (!s.ok()) {
// If writable_file_->Append() failed, then the data may or may not // If writable_file_->Append() failed, then the data may or may not
@ -579,6 +591,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(
size_t left = size; size_t left = size;
DataVerificationInfo v_info; DataVerificationInfo v_info;
char checksum_buf[sizeof(uint32_t)]; char checksum_buf[sizeof(uint32_t)];
IOOptions io_options;
// Check how much is allowed. Here, we loop until the rate limiter allows to // Check how much is allowed. Here, we loop until the rate limiter allows to
// write the entire buffer. // write the entire buffer.
@ -588,13 +601,17 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(
Env::IOPriority rate_limiter_priority_used = Env::IOPriority rate_limiter_priority_used =
WritableFileWriter::DecideRateLimiterPriority( WritableFileWriter::DecideRateLimiterPriority(
writable_file_->GetIOPriority(), op_rate_limiter_priority); writable_file_->GetIOPriority(), op_rate_limiter_priority);
if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) { if (rate_limiter_priority_used != Env::IO_TOTAL) {
while (data_size > 0) { if (rate_limiter_ != nullptr) {
size_t tmp_size; while (data_size > 0) {
tmp_size = rate_limiter_->RequestToken(data_size, buf_.Alignment(), size_t tmp_size;
rate_limiter_priority_used, stats_, tmp_size = rate_limiter_->RequestToken(
RateLimiter::OpType::kWrite); data_size, buf_.Alignment(), rate_limiter_priority_used, stats_,
data_size -= tmp_size; RateLimiter::OpType::kWrite);
data_size -= tmp_size;
}
} else {
io_options.rate_limiter_priority = rate_limiter_priority_used;
} }
} }
@ -604,7 +621,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
FileOperationInfo::StartTimePoint start_ts; FileOperationInfo::StartTimePoint start_ts;
uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr); uint64_t old_size = writable_file_->GetFileSize(io_options, nullptr);
if (ShouldNotifyListeners()) { if (ShouldNotifyListeners()) {
start_ts = FileOperationInfo::StartNow(); start_ts = FileOperationInfo::StartNow();
old_size = next_write_offset_; old_size = next_write_offset_;
@ -617,8 +634,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(
EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_); EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_);
v_info.checksum = Slice(checksum_buf, sizeof(uint32_t)); v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
s = writable_file_->Append(Slice(src, left), IOOptions(), v_info, s = writable_file_->Append(Slice(src, left), io_options, v_info, nullptr);
nullptr);
SetPerfLevel(prev_perf_level); SetPerfLevel(prev_perf_level);
} }
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
@ -709,20 +725,22 @@ IOStatus WritableFileWriter::WriteDirect(
size_t left = buf_.CurrentSize(); size_t left = buf_.CurrentSize();
DataVerificationInfo v_info; DataVerificationInfo v_info;
char checksum_buf[sizeof(uint32_t)]; char checksum_buf[sizeof(uint32_t)];
IOOptions io_options;
Env::IOPriority rate_limiter_priority_used =
WritableFileWriter::DecideRateLimiterPriority(
writable_file_->GetIOPriority(), op_rate_limiter_priority);
while (left > 0) { while (left > 0) {
// Check how much is allowed // Check how much is allowed
size_t size; size_t size = left;
Env::IOPriority rate_limiter_priority_used = if (rate_limiter_priority_used != Env::IO_TOTAL) {
WritableFileWriter::DecideRateLimiterPriority( if (rate_limiter_ != nullptr) {
writable_file_->GetIOPriority(), op_rate_limiter_priority); size = rate_limiter_->RequestToken(left, buf_.Alignment(),
if (rate_limiter_ != nullptr && writable_file_->GetIOPriority(),
rate_limiter_priority_used != Env::IO_TOTAL) { stats_, RateLimiter::OpType::kWrite);
size = rate_limiter_->RequestToken(left, buf_.Alignment(), } else {
writable_file_->GetIOPriority(), io_options.rate_limiter_priority = rate_limiter_priority_used;
stats_, RateLimiter::OpType::kWrite); }
} else {
size = left;
} }
{ {
@ -737,10 +755,10 @@ IOStatus WritableFileWriter::WriteDirect(
Crc32cHandoffChecksumCalculation(src, size, checksum_buf); Crc32cHandoffChecksumCalculation(src, size, checksum_buf);
v_info.checksum = Slice(checksum_buf, sizeof(uint32_t)); v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
s = writable_file_->PositionedAppend(Slice(src, size), write_offset, s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
IOOptions(), v_info, nullptr); io_options, v_info, nullptr);
} else { } else {
s = writable_file_->PositionedAppend(Slice(src, size), write_offset, s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
IOOptions(), nullptr); io_options, nullptr);
} }
if (ShouldNotifyListeners()) { if (ShouldNotifyListeners()) {
@ -809,6 +827,7 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum(
size_t left = buf_.CurrentSize(); size_t left = buf_.CurrentSize();
DataVerificationInfo v_info; DataVerificationInfo v_info;
char checksum_buf[sizeof(uint32_t)]; char checksum_buf[sizeof(uint32_t)];
IOOptions io_options;
// Check how much is allowed. Here, we loop until the rate limiter allows to // Check how much is allowed. Here, we loop until the rate limiter allows to
// write the entire buffer. // write the entire buffer.
@ -818,13 +837,16 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum(
Env::IOPriority rate_limiter_priority_used = Env::IOPriority rate_limiter_priority_used =
WritableFileWriter::DecideRateLimiterPriority( WritableFileWriter::DecideRateLimiterPriority(
writable_file_->GetIOPriority(), op_rate_limiter_priority); writable_file_->GetIOPriority(), op_rate_limiter_priority);
if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) {
while (data_size > 0) { if (rate_limiter_priority_used != Env::IO_TOTAL) {
if (rate_limiter_ != nullptr) {
size_t size; size_t size;
size = rate_limiter_->RequestToken(data_size, buf_.Alignment(), size = rate_limiter_->RequestToken(data_size, buf_.Alignment(),
writable_file_->GetIOPriority(), writable_file_->GetIOPriority(),
stats_, RateLimiter::OpType::kWrite); stats_, RateLimiter::OpType::kWrite);
data_size -= size; data_size -= size;
} else {
io_options.rate_limiter_priority = rate_limiter_priority_used;
} }
} }
@ -839,7 +861,7 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum(
EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_); EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_);
v_info.checksum = Slice(checksum_buf, sizeof(uint32_t)); v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
s = writable_file_->PositionedAppend(Slice(src, left), write_offset, s = writable_file_->PositionedAppend(Slice(src, left), write_offset,
IOOptions(), v_info, nullptr); io_options, v_info, nullptr);
if (ShouldNotifyListeners()) { if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::steady_clock::now(); auto finish_ts = std::chrono::steady_clock::now();
@ -894,4 +916,15 @@ Env::IOPriority WritableFileWriter::DecideRateLimiterPriority(
return op_rate_limiter_priority; return op_rate_limiter_priority;
} }
} }
void WritableFileWriter::UpdateIOOptionsIfNeeded(
const std::unique_ptr<IOOptions>& io_options,
const Env::IOPriority op_rate_limiter_priority) {
Env::IOPriority rate_limiter_priority_used =
WritableFileWriter::DecideRateLimiterPriority(
writable_file_->GetIOPriority(), op_rate_limiter_priority);
if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) {
io_options->rate_limiter_priority = rate_limiter_priority_used;
}
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

View File

@ -281,6 +281,10 @@ class WritableFileWriter {
Env::IOPriority writable_file_io_priority, Env::IOPriority writable_file_io_priority,
Env::IOPriority op_rate_limiter_priority); Env::IOPriority op_rate_limiter_priority);
// If rate_limiter_ is nullptr, the rate_limiter_priority may be updated.
void UpdateIOOptionsIfNeeded(const std::unique_ptr<IOOptions>& io_options,
const Env::IOPriority op_rate_limiter_priority);
// Used when os buffering is OFF and we are writing // Used when os buffering is OFF and we are writing
// DMA such as in Direct I/O mode // DMA such as in Direct I/O mode
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE