This commit is contained in:
bowang 2022-05-17 11:04:22 -07:00
parent 5860b2eb55
commit 1cfd841908
4 changed files with 16 additions and 14 deletions

View File

@ -2285,7 +2285,7 @@ Status CompactionJob::OpenCompactionOutputFile(
/*enable_hash=*/paranoid_file_checks_); /*enable_hash=*/paranoid_file_checks_);
} }
writable_file->SetIOPriority(GetRateLimiterPriorityForWrite()); writable_file->SetIOPriority(GetRateLimiterPriority());
writable_file->SetWriteLifeTimeHint(write_hint_); writable_file->SetWriteLifeTimeHint(write_hint_);
FileTypeSet tmp_set = db_options_.checksum_handoff_file_types; FileTypeSet tmp_set = db_options_.checksum_handoff_file_types;
writable_file->SetPreallocationBlockSize(static_cast<size_t>( writable_file->SetPreallocationBlockSize(static_cast<size_t>(
@ -2476,7 +2476,7 @@ std::string CompactionJob::GetTableFileName(uint64_t file_number) {
file_number, compact_->compaction->output_path_id()); file_number, compact_->compaction->output_path_id());
} }
Env::IOPriority CompactionJob::GetRateLimiterPriorityForWrite() { Env::IOPriority CompactionJob::GetRateLimiterPriority() {
if (versions_ && versions_->GetColumnFamilySet() && if (versions_ && versions_->GetColumnFamilySet() &&
versions_->GetColumnFamilySet()->write_controller()) { versions_->GetColumnFamilySet()->write_controller()) {
WriteController* write_controller = WriteController* write_controller =

View File

@ -237,7 +237,9 @@ class CompactionJob {
// `output_directory_`. // `output_directory_`.
virtual std::string GetTableFileName(uint64_t file_number); virtual std::string GetTableFileName(uint64_t file_number);
// The rate limiter priority (io_priority) is determined dynamically here. // The rate limiter priority (io_priority) is determined dynamically here.
Env::IOPriority GetRateLimiterPriorityForWrite(); // The Compaction Read and Write priorities are the same for different
// scenarios, such as write stalled.
Env::IOPriority GetRateLimiterPriority();
}; };
// CompactionServiceInput is used the pass compaction information between two // CompactionServiceInput is used the pass compaction information between two

View File

@ -393,13 +393,13 @@ class CompactionJobTestBase : public testing::Test {
} }
if (check_get_priority) { if (check_get_priority) {
CheckGetRateLimiterPriorityForWrite(compaction_job); CheckGetRateLimiterPriority(compaction_job);
} }
} }
void CheckGetRateLimiterPriorityForWrite(CompactionJob& compaction_job) { void CheckGetRateLimiterPriority(CompactionJob& compaction_job) {
// When the state from WriteController is normal. // When the state from WriteController is normal.
ASSERT_EQ(compaction_job.GetRateLimiterPriorityForWrite(), Env::IO_LOW); ASSERT_EQ(compaction_job.GetRateLimiterPriority(), Env::IO_LOW);
WriteController* write_controller = WriteController* write_controller =
compaction_job.versions_->GetColumnFamilySet()->write_controller(); compaction_job.versions_->GetColumnFamilySet()->write_controller();
@ -408,21 +408,21 @@ class CompactionJobTestBase : public testing::Test {
// When the state from WriteController is CompactionPressure. // When the state from WriteController is CompactionPressure.
std::unique_ptr<WriteControllerToken> compaction_pressure_token = std::unique_ptr<WriteControllerToken> compaction_pressure_token =
write_controller->GetCompactionPressureToken(); write_controller->GetCompactionPressureToken();
ASSERT_EQ(compaction_job.GetRateLimiterPriorityForWrite(), Env::IO_HIGH); ASSERT_EQ(compaction_job.GetRateLimiterPriority(), Env::IO_HIGH);
} }
{ {
// When the state from WriteController is Delayed. // When the state from WriteController is Delayed.
std::unique_ptr<WriteControllerToken> delay_token = std::unique_ptr<WriteControllerToken> delay_token =
write_controller->GetDelayToken(1000000); write_controller->GetDelayToken(1000000);
ASSERT_EQ(compaction_job.GetRateLimiterPriorityForWrite(), Env::IO_USER); ASSERT_EQ(compaction_job.GetRateLimiterPriority(), Env::IO_USER);
} }
{ {
// When the state from WriteController is Stopped. // When the state from WriteController is Stopped.
std::unique_ptr<WriteControllerToken> stop_token = std::unique_ptr<WriteControllerToken> stop_token =
write_controller->GetStopToken(); write_controller->GetStopToken();
ASSERT_EQ(compaction_job.GetRateLimiterPriorityForWrite(), Env::IO_USER); ASSERT_EQ(compaction_job.GetRateLimiterPriority(), Env::IO_USER);
} }
} }
@ -1322,7 +1322,7 @@ TEST_F(CompactionJobTest, ResultSerialization) {
} }
} }
TEST_F(CompactionJobTest, GetRateLimiterPriorityForWrite) { TEST_F(CompactionJobTest, GetRateLimiterPriority) {
NewDB(); NewDB();
auto expected_results = CreateTwoFiles(false); auto expected_results = CreateTwoFiles(false);

View File

@ -736,8 +736,8 @@ IOStatus WritableFileWriter::WriteDirect(
if (rate_limiter_ != nullptr && if (rate_limiter_ != nullptr &&
rate_limiter_priority_used != Env::IO_TOTAL) { rate_limiter_priority_used != Env::IO_TOTAL) {
size = rate_limiter_->RequestToken(left, buf_.Alignment(), size = rate_limiter_->RequestToken(left, buf_.Alignment(),
writable_file_->GetIOPriority(), rate_limiter_priority_used, stats_,
stats_, RateLimiter::OpType::kWrite); RateLimiter::OpType::kWrite);
} }
{ {
@ -839,8 +839,8 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum(
while (data_size > 0) { while (data_size > 0) {
size_t size; size_t size;
size = rate_limiter_->RequestToken(data_size, buf_.Alignment(), size = rate_limiter_->RequestToken(data_size, buf_.Alignment(),
writable_file_->GetIOPriority(), rate_limiter_priority_used, stats_,
stats_, RateLimiter::OpType::kWrite); RateLimiter::OpType::kWrite);
data_size -= size; data_size -= size;
} }
} }