diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 98d35140a..299aad5a5 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -137,6 +137,8 @@ class CompactionJob { IOStatus io_status_; private: + friend class CompactionJobTestBase; + // Generates a histogram representing potential divisions of key ranges from // the input. It adds the starting and/or ending keys of certain input files // to the working set and then finds the approximate size of data in between diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index 72b9bd273..fb97050ba 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -321,7 +321,8 @@ class CompactionJobTestBase : public testing::Test { const std::vector& snapshots = {}, SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber, int output_level = 1, bool verify = true, - uint64_t expected_oldest_blob_file_number = kInvalidBlobFileNumber) { + uint64_t expected_oldest_blob_file_number = kInvalidBlobFileNumber, + bool check_get_priority = false) { auto cfd = versions_->GetColumnFamilySet()->GetDefault(); size_t num_input_files = 0; @@ -390,6 +391,58 @@ class CompactionJobTestBase : public testing::Test { expected_oldest_blob_file_number); } } + + if (check_get_priority) { + CheckGetRateLimiterPriority(compaction_job); + } + } + + void CheckGetRateLimiterPriority(CompactionJob& compaction_job) { + // When the state from WriteController is normal. + ASSERT_EQ( + compaction_job.GetRateLimiterPriority(RateLimiter::OpType::kWrite), + Env::IO_LOW); + ASSERT_EQ(compaction_job.GetRateLimiterPriority(RateLimiter::OpType::kRead), + Env::IO_LOW); + + WriteController* write_controller = + compaction_job.versions_->GetColumnFamilySet()->write_controller(); + + { + // When the state from WriteController is CompactionPressure. + std::unique_ptr compaction_pressure_token = + write_controller->GetCompactionPressureToken(); + ASSERT_EQ( + compaction_job.GetRateLimiterPriority(RateLimiter::OpType::kWrite), + Env::IO_HIGH); + ASSERT_EQ( + compaction_job.GetRateLimiterPriority(RateLimiter::OpType::kRead), + Env::IO_HIGH); + } + + { + // When the state from WriteController is Delayed. + std::unique_ptr delay_token = + write_controller->GetDelayToken(1000000); + ASSERT_EQ( + compaction_job.GetRateLimiterPriority(RateLimiter::OpType::kWrite), + Env::IO_USER); + ASSERT_EQ( + compaction_job.GetRateLimiterPriority(RateLimiter::OpType::kRead), + Env::IO_USER); + } + + { + // When the state from WriteController is Stopped. + std::unique_ptr stop_token = + write_controller->GetStopToken(); + ASSERT_EQ( + compaction_job.GetRateLimiterPriority(RateLimiter::OpType::kWrite), + Env::IO_USER); + ASSERT_EQ( + compaction_job.GetRateLimiterPriority(RateLimiter::OpType::kRead), + Env::IO_USER); + } } std::shared_ptr env_guard_; @@ -1288,6 +1341,17 @@ TEST_F(CompactionJobTest, ResultSerialization) { } } +TEST_F(CompactionJobTest, GetRateLimiterPriority) { + NewDB(); + + auto expected_results = CreateTwoFiles(false); + auto cfd = versions_->GetColumnFamilySet()->GetDefault(); + auto files = cfd->current()->storage_info()->LevelFiles(0); + ASSERT_EQ(2U, files.size()); + RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, true, + kInvalidBlobFileNumber, true); +} + class CompactionJobTimestampTest : public CompactionJobTestBase { public: CompactionJobTimestampTest() diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 69a3e97a9..40e359018 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -581,20 +581,26 @@ TEST_F(FlushJobTest, GetRateLimiterPriority) { WriteController* write_controller = flush_job.versions_->GetColumnFamilySet()->write_controller(); - // When the state from WriteController is Delayed. - std::unique_ptr delay_token = - write_controller->GetDelayToken(1000000); - ASSERT_EQ(flush_job.GetRateLimiterPriority(RateLimiter::OpType::kWrite), - Env::IO_USER); - ASSERT_EQ(flush_job.GetRateLimiterPriority(RateLimiter::OpType::kRead), - Env::IO_USER); - // When the state from WriteController is Stopped. - std::unique_ptr stop_token = - write_controller->GetStopToken(); - ASSERT_EQ(flush_job.GetRateLimiterPriority(RateLimiter::OpType::kWrite), - Env::IO_USER); - ASSERT_EQ(flush_job.GetRateLimiterPriority(RateLimiter::OpType::kRead), - Env::IO_USER); + + { + // When the state from WriteController is Delayed. + std::unique_ptr delay_token = + write_controller->GetDelayToken(1000000); + ASSERT_EQ(flush_job.GetRateLimiterPriority(RateLimiter::OpType::kWrite), + Env::IO_USER); + ASSERT_EQ(flush_job.GetRateLimiterPriority(RateLimiter::OpType::kRead), + Env::IO_USER); + } + + { + // When the state from WriteController is Stopped. + std::unique_ptr stop_token = + write_controller->GetStopToken(); + ASSERT_EQ(flush_job.GetRateLimiterPriority(RateLimiter::OpType::kWrite), + Env::IO_USER); + ASSERT_EQ(flush_job.GetRateLimiterPriority(RateLimiter::OpType::kRead), + Env::IO_USER); + } } class FlushJobTimestampTest : public FlushJobTestBase { @@ -727,8 +733,6 @@ TEST_F(FlushJobTimestampTest, NoKeyExpired) { ASSERT_TRUE(to_delete.empty()); } - - } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {