[WIP] Set rate limiter priority dynamically and pass it to FS if needed

This commit is contained in:
bowang 2022-05-12 14:07:15 -07:00
parent e96e8e2d05
commit 957494150f
8 changed files with 145 additions and 51 deletions

View File

@ -2285,7 +2285,8 @@ Status CompactionJob::OpenCompactionOutputFile(
/*enable_hash=*/paranoid_file_checks_);
}
writable_file->SetIOPriority(Env::IOPriority::IO_LOW);
writable_file->SetIOPriority(
GetRateLimiterPriority(RateLimiter::OpType::kWrite));
writable_file->SetWriteLifeTimeHint(write_hint_);
FileTypeSet tmp_set = db_options_.checksum_handoff_file_types;
writable_file->SetPreallocationBlockSize(static_cast<size_t>(
@ -2476,6 +2477,30 @@ std::string CompactionJob::GetTableFileName(uint64_t file_number) {
file_number, compact_->compaction->output_path_id());
}
Env::IOPriority CompactionJob::GetRateLimiterPriority(
const RateLimiter::OpType op_type) {
WriteController* write_controller =
versions_->GetColumnFamilySet()->write_controller();
// TODO: if all priorities are the same for read and write, update this.
if (op_type == RateLimiter::OpType::kWrite) {
if (write_controller->NeedsDelay() || write_controller->IsStopped()) {
return Env::IO_USER;
} else if (write_controller->NeedSpeedupCompaction()) {
return Env::IO_HIGH;
}
return Env::IO_LOW;
} else {
if (write_controller->NeedsDelay()|| (write_controller->IsStopped()) {
return Env::IO_USER;
} else if (write_controller->NeedSpeedupCompaction()) {
return Env::IO_HIGH;
}
return Env::IO_LOW;
}
}
#ifndef ROCKSDB_LITE
std::string CompactionServiceCompactionJob::GetTableFileName(
uint64_t file_number) {

View File

@ -234,6 +234,8 @@ class CompactionJob {
// Get table file name in where it's outputting to, which should also be in
// `output_directory_`.
virtual std::string GetTableFileName(uint64_t file_number);
// The rate limiter priority (io_priority) is determined dynamically here.
Env::IOPriority GetRateLimiterPriority(const RateLimiter::OpType op_type);
};
// CompactionServiceInput is used the pass compaction information between two

View File

@ -809,6 +809,10 @@ Status FlushJob::WriteLevel0Table() {
{
auto write_hint = cfd_->CalculateSSTWriteHint(0);
// https://fburl.com/code/zxh9nif4 : All of WriteController functions
// are to be called while holding DB mutex.
Env::IOPriority io_priority =
GetRateLimiterPriority(RateLimiter::OpType::kWrite);
db_mutex_->Unlock();
if (log_buffer_) {
log_buffer_->FlushBufferToLog();
@ -924,7 +928,7 @@ Status FlushJob::WriteLevel0Table() {
snapshot_checker_, mutable_cf_options_.paranoid_file_checks,
cfd_->internal_stats(), &io_s, io_tracer_,
BlobFileCreationReason::kFlush, event_logger_, job_context_->job_id,
Env::IO_HIGH, &table_properties_, write_hint, full_history_ts_low,
io_priority, &table_properties_, write_hint, full_history_ts_low,
blob_callback_, &num_input_entries, &memtable_payload_bytes,
&memtable_garbage_bytes);
// TODO: Cleanup io_status in BuildTable and table builders
@ -1064,6 +1068,22 @@ std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
return info;
}
Env::IOPriority FlushJob::GetRateLimiterPriority(
const RateLimiter::OpType op_type) {
WriteController* write_controller =
versions_->GetColumnFamilySet()->write_controller();
if (op_type == RateLimiter::OpType::kWrite) {
if (write_controller->IsStopped() || write_controller->NeedsDelay()) {
return Env::IO_USER;
}
return Env::IO_HIGH;
} else {
return Env::IO_USER;
}
}
}
#endif // !ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE

View File

@ -123,6 +123,8 @@ class FlushJob {
bool MemPurgeDecider();
#ifndef ROCKSDB_LITE
std::unique_ptr<FlushJobInfo> GetFlushJobInfo() const;
// The rate limiter priority (io_priority) is determined dynamically here.
Env::IOPriority GetRateLimiterPriority(const RateLimiter::OpType op_type);
#endif // !ROCKSDB_LITE
const std::string& dbname_;

View File

@ -658,6 +658,15 @@ TEST_F(FlushJobTimestampTest, NoKeyExpired) {
ASSERT_TRUE(to_delete.empty());
}
TEST_F(FlushJobTimestampTest, GetRateLimiterPriority) {
// When the state from WriteController is normal.
ASSERT_EQ(GetRateLimiterPriority(RateLimiter::OpType::kWrite), Env::IO_HIGH);
// When the state from WriteController is Delayed.
// When the state from WriteController is Stopped.
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

View File

@ -52,7 +52,7 @@ class WriteController {
bool IsStopped() const;
bool NeedsDelay() const { return total_delayed_.load() > 0; }
bool NeedSpeedupCompaction() const {
return IsStopped() || NeedsDelay() || total_compaction_pressure_ > 0;
return IsStopped() || NeedsDelay() || total_compaction_pressure_.load() > 0;
}
// return how many microseconds the caller needs to sleep after the call
// num_bytes: how many number of bytes to put into the DB.

View File

@ -54,10 +54,12 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum,
UpdateFileChecksum(data);
{
IOOptions io_options;
UpdateIOOptionsIfNeeded(io_options, op_rate_limiter_priority);
IOSTATS_TIMER_GUARD(prepare_write_nanos);
TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite");
writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left,
IOOptions(), nullptr);
io_options, nullptr);
}
// See whether we need to enlarge the buffer to avoid the flush
@ -211,6 +213,8 @@ IOStatus WritableFileWriter::Close() {
s = Flush(); // flush cache to OS
IOStatus interim;
IOOptions io_options;
io_options.rate_limiter_priority = writable_file_->GetIOPriority();
// In direct I/O mode we write whole pages so
// we need to let the file know where data ends.
if (use_direct_io()) {
@ -221,7 +225,7 @@ IOStatus WritableFileWriter::Close() {
start_ts = FileOperationInfo::StartNow();
}
#endif
interim = writable_file_->Truncate(filesize_, IOOptions(), nullptr);
interim = writable_file_->Truncate(filesize_, io_options, nullptr);
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow();
@ -241,7 +245,7 @@ IOStatus WritableFileWriter::Close() {
start_ts = FileOperationInfo::StartNow();
}
#endif
interim = writable_file_->Fsync(IOOptions(), nullptr);
interim = writable_file_->Fsync(io_options, nullptr);
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow();
@ -267,7 +271,7 @@ IOStatus WritableFileWriter::Close() {
start_ts = FileOperationInfo::StartNow();
}
#endif
interim = writable_file_->Close(IOOptions(), nullptr);
interim = writable_file_->Close(io_options, nullptr);
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow();
@ -331,7 +335,9 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) {
start_ts = FileOperationInfo::StartNow();
}
#endif
s = writable_file_->Flush(IOOptions(), nullptr);
IOOptions io_options;
UpdateIOOptionsIfNeeded(io_options, op_rate_limiter_priority);
s = writable_file_->Flush(io_options, nullptr);
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::steady_clock::now();
@ -435,10 +441,12 @@ IOStatus WritableFileWriter::SyncInternal(bool use_fsync) {
start_ts = FileOperationInfo::StartNow();
}
#endif
IOOptions io_options;
io_options.rate_limiter_priority = writable_file_->GetIOPriority();
if (use_fsync) {
s = writable_file_->Fsync(IOOptions(), nullptr);
s = writable_file_->Fsync(io_options, nullptr);
} else {
s = writable_file_->Sync(IOOptions(), nullptr);
s = writable_file_->Sync(io_options, nullptr);
}
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
@ -466,7 +474,9 @@ IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
start_ts = FileOperationInfo::StartNow();
}
#endif
IOStatus s = writable_file_->RangeSync(offset, nbytes, IOOptions(), nullptr);
IOOptions io_options;
io_options.rate_limiter_priority = writable_file_->GetIOPriority();
IOStatus s = writable_file_->RangeSync(offset, nbytes, io_options, nullptr);
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::steady_clock::now();
@ -490,19 +500,21 @@ IOStatus WritableFileWriter::WriteBuffered(
size_t left = size;
DataVerificationInfo v_info;
char checksum_buf[sizeof(uint32_t)];
IOOptions io_options;
Env::IOPriority rate_limiter_priority_used =
WritableFileWriter::DecideRateLimiterPriority(
writable_file_->GetIOPriority(), op_rate_limiter_priority);
while (left > 0) {
size_t allowed;
Env::IOPriority rate_limiter_priority_used =
WritableFileWriter::DecideRateLimiterPriority(
writable_file_->GetIOPriority(), op_rate_limiter_priority);
if (rate_limiter_ != nullptr &&
rate_limiter_priority_used != Env::IO_TOTAL) {
allowed = rate_limiter_->RequestToken(left, 0 /* alignment */,
rate_limiter_priority_used, stats_,
RateLimiter::OpType::kWrite);
} else {
allowed = left;
size_t allowed = left;
if (rate_limiter_priority_used != Env::IO_TOTAL) {
if (rate_limiter_ != nullptr) {
allowed = rate_limiter_->RequestToken(
left, 0 /* alignment */, rate_limiter_priority_used, stats_,
RateLimiter::OpType::kWrite);
} else {
io_options.rate_limiter_priority = rate_limiter_priority_used;
}
}
{
@ -511,7 +523,7 @@ IOStatus WritableFileWriter::WriteBuffered(
#ifndef ROCKSDB_LITE
FileOperationInfo::StartTimePoint start_ts;
uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr);
uint64_t old_size = writable_file_->GetFileSize(io_options, nullptr);
if (ShouldNotifyListeners()) {
start_ts = FileOperationInfo::StartNow();
old_size = next_write_offset_;
@ -524,10 +536,10 @@ IOStatus WritableFileWriter::WriteBuffered(
if (perform_data_verification_) {
Crc32cHandoffChecksumCalculation(src, allowed, checksum_buf);
v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
s = writable_file_->Append(Slice(src, allowed), IOOptions(), v_info,
s = writable_file_->Append(Slice(src, allowed), io_options, v_info,
nullptr);
} else {
s = writable_file_->Append(Slice(src, allowed), IOOptions(), nullptr);
s = writable_file_->Append(Slice(src, allowed), io_options, nullptr);
}
if (!s.ok()) {
// If writable_file_->Append() failed, then the data may or may not
@ -579,6 +591,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(
size_t left = size;
DataVerificationInfo v_info;
char checksum_buf[sizeof(uint32_t)];
IOOptions io_options;
// Check how much is allowed. Here, we loop until the rate limiter allows to
// write the entire buffer.
@ -588,13 +601,17 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(
Env::IOPriority rate_limiter_priority_used =
WritableFileWriter::DecideRateLimiterPriority(
writable_file_->GetIOPriority(), op_rate_limiter_priority);
if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) {
while (data_size > 0) {
size_t tmp_size;
tmp_size = rate_limiter_->RequestToken(data_size, buf_.Alignment(),
rate_limiter_priority_used, stats_,
RateLimiter::OpType::kWrite);
data_size -= tmp_size;
if (rate_limiter_priority_used != Env::IO_TOTAL) {
if (rate_limiter_ != nullptr) {
while (data_size > 0) {
size_t tmp_size;
tmp_size = rate_limiter_->RequestToken(
data_size, buf_.Alignment(), rate_limiter_priority_used, stats_,
RateLimiter::OpType::kWrite);
data_size -= tmp_size;
}
} else {
io_options.rate_limiter_priority = rate_limiter_priority_used;
}
}
@ -604,7 +621,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(
#ifndef ROCKSDB_LITE
FileOperationInfo::StartTimePoint start_ts;
uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr);
uint64_t old_size = writable_file_->GetFileSize(io_options, nullptr);
if (ShouldNotifyListeners()) {
start_ts = FileOperationInfo::StartNow();
old_size = next_write_offset_;
@ -617,8 +634,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(
EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_);
v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
s = writable_file_->Append(Slice(src, left), IOOptions(), v_info,
nullptr);
s = writable_file_->Append(Slice(src, left), io_options, v_info, nullptr);
SetPerfLevel(prev_perf_level);
}
#ifndef ROCKSDB_LITE
@ -709,20 +725,22 @@ IOStatus WritableFileWriter::WriteDirect(
size_t left = buf_.CurrentSize();
DataVerificationInfo v_info;
char checksum_buf[sizeof(uint32_t)];
IOOptions io_options;
Env::IOPriority rate_limiter_priority_used =
WritableFileWriter::DecideRateLimiterPriority(
writable_file_->GetIOPriority(), op_rate_limiter_priority);
while (left > 0) {
// Check how much is allowed
size_t size;
Env::IOPriority rate_limiter_priority_used =
WritableFileWriter::DecideRateLimiterPriority(
writable_file_->GetIOPriority(), op_rate_limiter_priority);
if (rate_limiter_ != nullptr &&
rate_limiter_priority_used != Env::IO_TOTAL) {
size = rate_limiter_->RequestToken(left, buf_.Alignment(),
writable_file_->GetIOPriority(),
stats_, RateLimiter::OpType::kWrite);
} else {
size = left;
size_t size = left;
if (rate_limiter_priority_used != Env::IO_TOTAL) {
if (rate_limiter_ != nullptr) {
size = rate_limiter_->RequestToken(left, buf_.Alignment(),
writable_file_->GetIOPriority(),
stats_, RateLimiter::OpType::kWrite);
} else {
io_options.rate_limiter_priority = rate_limiter_priority_used;
}
}
{
@ -737,10 +755,10 @@ IOStatus WritableFileWriter::WriteDirect(
Crc32cHandoffChecksumCalculation(src, size, checksum_buf);
v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
IOOptions(), v_info, nullptr);
io_options, v_info, nullptr);
} else {
s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
IOOptions(), nullptr);
io_options, nullptr);
}
if (ShouldNotifyListeners()) {
@ -809,6 +827,7 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum(
size_t left = buf_.CurrentSize();
DataVerificationInfo v_info;
char checksum_buf[sizeof(uint32_t)];
IOOptions io_options;
// Check how much is allowed. Here, we loop until the rate limiter allows to
// write the entire buffer.
@ -818,13 +837,16 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum(
Env::IOPriority rate_limiter_priority_used =
WritableFileWriter::DecideRateLimiterPriority(
writable_file_->GetIOPriority(), op_rate_limiter_priority);
if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) {
while (data_size > 0) {
if (rate_limiter_priority_used != Env::IO_TOTAL) {
if (rate_limiter_ != nullptr) {
size_t size;
size = rate_limiter_->RequestToken(data_size, buf_.Alignment(),
writable_file_->GetIOPriority(),
stats_, RateLimiter::OpType::kWrite);
data_size -= size;
} else {
io_options.rate_limiter_priority = rate_limiter_priority_used;
}
}
@ -839,7 +861,7 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum(
EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_);
v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
s = writable_file_->PositionedAppend(Slice(src, left), write_offset,
IOOptions(), v_info, nullptr);
io_options, v_info, nullptr);
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::steady_clock::now();
@ -894,4 +916,14 @@ Env::IOPriority WritableFileWriter::DecideRateLimiterPriority(
return op_rate_limiter_priority;
}
}
void WritableFileWriter::UpdateIOOptionsIfNeeded(
IOOptions& io_options, const Env::IOPriority op_rate_limiter_priority) {
Env::IOPriority rate_limiter_priority_used =
WritableFileWriter::DecideRateLimiterPriority(
writable_file_->GetIOPriority(), op_rate_limiter_priority);
if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) {
io_options.rate_limiter_priority = rate_limiter_priority_used;
}
}
} // namespace ROCKSDB_NAMESPACE

View File

@ -281,6 +281,10 @@ class WritableFileWriter {
Env::IOPriority writable_file_io_priority,
Env::IOPriority op_rate_limiter_priority);
// If rate_limiter_ is nullptr, the rate_limiter_priority may be updated.
void UpdateIOOptionsIfNeeded(IOOptions& io_options,
const Env::IOPriority op_rate_limiter_priority);
// Used when os buffering is OFF and we are writing
// DMA such as in Direct I/O mode
#ifndef ROCKSDB_LITE