diff --git a/env/env_test.cc b/env/env_test.cc index 004adf26e..6e2420705 100644 --- a/env/env_test.cc +++ b/env/env_test.cc @@ -567,18 +567,19 @@ TEST_P(EnvPosixTestWithParam, TwoPools) { } TEST_P(EnvPosixTestWithParam, DecreaseNumBgThreads) { + constexpr int kWaitMicros = 60000000; // 1min + std::vector tasks(10); // Set number of thread to 1 first. env_->SetBackgroundThreads(1, Env::Priority::HIGH); - Env::Default()->SleepForMicroseconds(kDelayMicros); // Schedule 3 tasks. 0 running; Task 1, 2 waiting. for (size_t i = 0; i < 3; i++) { env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[i], Env::Priority::HIGH); - Env::Default()->SleepForMicroseconds(kDelayMicros); } + ASSERT_FALSE(tasks[0].TimedWaitUntilSleeping(kWaitMicros)); ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); ASSERT_TRUE(tasks[0].IsSleeping()); ASSERT_TRUE(!tasks[1].IsSleeping()); @@ -586,7 +587,7 @@ TEST_P(EnvPosixTestWithParam, DecreaseNumBgThreads) { // Increase to 2 threads. Task 0, 1 running; 2 waiting env_->SetBackgroundThreads(2, Env::Priority::HIGH); - Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_FALSE(tasks[1].TimedWaitUntilSleeping(kWaitMicros)); ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); ASSERT_TRUE(tasks[0].IsSleeping()); ASSERT_TRUE(tasks[1].IsSleeping()); @@ -602,7 +603,7 @@ TEST_P(EnvPosixTestWithParam, DecreaseNumBgThreads) { // The last task finishes. Task 0 running, 2 waiting. tasks[1].WakeUp(); - Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_FALSE(tasks[1].TimedWaitUntilDone(kWaitMicros)); ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); ASSERT_TRUE(tasks[0].IsSleeping()); ASSERT_TRUE(!tasks[1].IsSleeping()); @@ -610,16 +611,17 @@ TEST_P(EnvPosixTestWithParam, DecreaseNumBgThreads) { // Increase to 5 threads. Task 0 and 2 running. env_->SetBackgroundThreads(5, Env::Priority::HIGH); - Env::Default()->SleepForMicroseconds(kDelayMicros); - ASSERT_EQ((unsigned int)0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_FALSE(tasks[2].TimedWaitUntilSleeping(kWaitMicros)); + ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); ASSERT_TRUE(tasks[0].IsSleeping()); + ASSERT_TRUE(!tasks[1].IsSleeping()); ASSERT_TRUE(tasks[2].IsSleeping()); // Change number of threads a couple of times while there is no sufficient // tasks. env_->SetBackgroundThreads(7, Env::Priority::HIGH); - Env::Default()->SleepForMicroseconds(kDelayMicros); tasks[2].WakeUp(); + ASSERT_FALSE(tasks[2].TimedWaitUntilDone(kWaitMicros)); ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); env_->SetBackgroundThreads(3, Env::Priority::HIGH); Env::Default()->SleepForMicroseconds(kDelayMicros); @@ -642,8 +644,13 @@ TEST_P(EnvPosixTestWithParam, DecreaseNumBgThreads) { env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[i], Env::Priority::HIGH); } - Env::Default()->SleepForMicroseconds(kDelayMicros); + for (size_t i = 3; i <= 5; i++) { + ASSERT_FALSE(tasks[i].TimedWaitUntilSleeping(kWaitMicros)); + } ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_TRUE(tasks[0].IsSleeping()); + ASSERT_TRUE(!tasks[1].IsSleeping()); + ASSERT_TRUE(!tasks[2].IsSleeping()); ASSERT_TRUE(tasks[3].IsSleeping()); ASSERT_TRUE(tasks[4].IsSleeping()); ASSERT_TRUE(tasks[5].IsSleeping()); @@ -655,8 +662,10 @@ TEST_P(EnvPosixTestWithParam, DecreaseNumBgThreads) { tasks[3].WakeUp(); tasks[4].WakeUp(); - Env::Default()->SleepForMicroseconds(kDelayMicros); - ASSERT_EQ((unsigned int)0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + for (size_t i = 5; i < 8; i++) { + ASSERT_FALSE(tasks[i].TimedWaitUntilSleeping(kWaitMicros)); + } + ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); for (size_t i = 5; i < 8; i++) { ASSERT_TRUE(tasks[i].IsSleeping()); } @@ -670,14 +679,14 @@ TEST_P(EnvPosixTestWithParam, DecreaseNumBgThreads) { // Wake up task 6. Task 5, 7 running tasks[6].WakeUp(); - Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_FALSE(tasks[6].TimedWaitUntilDone(kWaitMicros)); ASSERT_TRUE(tasks[5].IsSleeping()); ASSERT_TRUE(!tasks[6].IsSleeping()); ASSERT_TRUE(tasks[7].IsSleeping()); // Wake up threads 7. Task 5 running tasks[7].WakeUp(); - Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_FALSE(tasks[7].TimedWaitUntilDone(kWaitMicros)); ASSERT_TRUE(!tasks[7].IsSleeping()); // Enqueue thread 8 and 9. Task 5 running; one of 8, 9 might be running. @@ -701,20 +710,18 @@ TEST_P(EnvPosixTestWithParam, DecreaseNumBgThreads) { // Wake up thread 9. tasks[9].WakeUp(); - Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_FALSE(tasks[9].TimedWaitUntilDone(kWaitMicros)); ASSERT_TRUE(!tasks[9].IsSleeping()); ASSERT_TRUE(tasks[8].IsSleeping()); // Wake up thread 8 tasks[8].WakeUp(); - Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_FALSE(tasks[8].TimedWaitUntilDone(kWaitMicros)); ASSERT_TRUE(!tasks[8].IsSleeping()); // Wake up the last thread tasks[5].WakeUp(); - - Env::Default()->SleepForMicroseconds(kDelayMicros); - ASSERT_TRUE(!tasks[5].IsSleeping()); + ASSERT_FALSE(tasks[5].TimedWaitUntilDone(kWaitMicros)); WaitThreadPoolsEmpty(); } diff --git a/test_util/testutil.h b/test_util/testutil.h index 5fec97e02..7ec6a3c99 100644 --- a/test_util/testutil.h +++ b/test_util/testutil.h @@ -429,6 +429,20 @@ class SleepingBackgroundTask { bg_cv_.Wait(); } } + // Waits for the status to change to sleeping, + // otherwise times out. + // wait_time is in microseconds. + // Returns true when times out, false otherwise. + bool TimedWaitUntilSleeping(uint64_t wait_time) { + auto abs_time = Env::Default()->NowMicros() + wait_time; + MutexLock l(&mutex_); + while (!sleeping_ || !should_sleep_) { + if (bg_cv_.TimedWait(abs_time)) { + return true; + } + } + return false; + } void WakeUp() { MutexLock l(&mutex_); should_sleep_ = false; @@ -440,6 +454,18 @@ class SleepingBackgroundTask { bg_cv_.Wait(); } } + // Similar to TimedWaitUntilSleeping. + // Waits until the task is done. + bool TimedWaitUntilDone(uint64_t wait_time) { + auto abs_time = Env::Default()->NowMicros() + wait_time; + MutexLock l(&mutex_); + while (!done_with_sleep_) { + if (bg_cv_.TimedWait(abs_time)) { + return true; + } + } + return false; + } bool WokenUp() { MutexLock l(&mutex_); return should_sleep_ == false;