Fix a timer_test deadlock (#7277)

Summary:
There's a potential deadlock caused by MockTimeEnv time value get to a large number, which causes TimedWait() wait forever. The test misuses the microseconds as seconds, making it more likely to happen.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7277

Reviewed By: pdillinger

Differential Revision: D23183873

Pulled By: jay-zhuang

fbshipit-source-id: 6fc38ebd40b4125a99551204b271f91a27e70086
This commit is contained in:
Jay Zhuang 2020-08-20 08:42:05 -07:00 committed by Facebook GitHub Bot
parent 2040bb545b
commit 3e422ce0ca
7 changed files with 223 additions and 238 deletions

View File

@ -1150,28 +1150,4 @@ class DBTestBase : public testing::Test {
bool time_elapse_only_sleep_on_reopen_ = false;
};
class SafeMockTimeEnv : public MockTimeEnv {
public:
explicit SafeMockTimeEnv(Env* base) : MockTimeEnv(base) {
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
#if defined(OS_MACOSX) && !defined(NDEBUG)
// This is an alternate way (vs. SpecialEnv) of dealing with the fact
// that on some platforms, pthread_cond_timedwait does not appear to
// release the lock for other threads to operate if the deadline time
// is already passed. (TimedWait calls are currently a bad abstraction
// because the deadline parameter is usually computed from Env time,
// but is interpreted in real clock time.)
SyncPoint::GetInstance()->SetCallBack(
"InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
uint64_t time_us = *reinterpret_cast<uint64_t*>(arg);
if (time_us < this->RealNowMicros()) {
*reinterpret_cast<uint64_t*>(arg) = this->RealNowMicros() + 1000;
}
});
#endif // OS_MACOSX && !NDEBUG
SyncPoint::GetInstance()->EnableProcessing();
}
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -14,12 +14,13 @@ class StatsDumpSchedulerTest : public DBTestBase {
public:
StatsDumpSchedulerTest()
: DBTestBase("/stats_dump_scheduler_test", /*env_do_fsync=*/true),
mock_env_(new SafeMockTimeEnv(Env::Default())) {}
mock_env_(new MockTimeEnv(Env::Default())) {}
protected:
std::unique_ptr<SafeMockTimeEnv> mock_env_;
std::unique_ptr<MockTimeEnv> mock_env_;
void SetUp() override {
mock_env_->InstallTimedWaitFixCallback();
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::StartStatsDumpScheduler:Init", [&](void* arg) {
auto* stats_dump_scheduler_ptr =

View File

@ -33,12 +33,13 @@ class StatsHistoryTest : public DBTestBase {
public:
StatsHistoryTest()
: DBTestBase("/stats_history_test", /*env_do_fsync=*/true),
mock_env_(new SafeMockTimeEnv(Env::Default())) {}
mock_env_(new MockTimeEnv(Env::Default())) {}
protected:
std::unique_ptr<SafeMockTimeEnv> mock_env_;
std::unique_ptr<MockTimeEnv> mock_env_;
void SetUp() override {
mock_env_->InstallTimedWaitFixCallback();
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::StartStatsDumpScheduler:Init", [&](void* arg) {
auto* stats_dump_scheduler_ptr =

View File

@ -41,6 +41,30 @@ class MockTimeEnv : public EnvWrapper {
current_time_ = time;
}
// TODO: this is a workaround for the different behavior on different platform
// for timedwait timeout. Ideally timedwait API should be moved to env.
// details: PR #7101.
void InstallTimedWaitFixCallback() {
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
#if defined(OS_MACOSX) && !defined(NDEBUG)
// This is an alternate way (vs. SpecialEnv) of dealing with the fact
// that on some platforms, pthread_cond_timedwait does not appear to
// release the lock for other threads to operate if the deadline time
// is already passed. (TimedWait calls are currently a bad abstraction
// because the deadline parameter is usually computed from Env time,
// but is interpreted in real clock time.)
SyncPoint::GetInstance()->SetCallBack(
"InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
uint64_t time_us = *reinterpret_cast<uint64_t*>(arg);
if (time_us < this->RealNowMicros()) {
*reinterpret_cast<uint64_t*>(arg) = this->RealNowMicros() + 1000;
}
});
#endif // OS_MACOSX && !NDEBUG
SyncPoint::GetInstance()->EnableProcessing();
}
private:
std::atomic<uint64_t> current_time_{0};
};

View File

@ -10,7 +10,9 @@
#include "port/port.h"
#include "rocksdb/env.h"
#ifndef NDEBUG
#include "test_util/mock_time_env.h"
#endif // !NDEBUG
#include "util/mutexlock.h"
namespace ROCKSDB_NAMESPACE {

View File

@ -42,9 +42,15 @@ class Timer {
running_(false),
executing_task_(false) {}
// Add a new function. If the fn_name already exists, overriding it,
// regardless if the function is pending removed (invalid) or not.
// repeat_every_us == 0 means do not repeat
// Add a new function to run.
// fn_name has to be identical, otherwise, the new one overrides the existing
// one, regardless if the function is pending removed (invalid) or not.
// start_after_us is the initial delay.
// repeat_every_us is the interval between ending time of the last call and
// starting time of the next call. For example, repeat_every_us = 2000 and
// the function takes 1000us to run. If it starts at time [now]us, then it
// finishes at [now]+1000us, 2nd run starting time will be at [now]+3000us.
// repeat_every_us == 0 means do not repeat.
void Add(std::function<void()> fn,
const std::string& fn_name,
uint64_t start_after_us,
@ -138,10 +144,18 @@ class Timer {
}
#ifndef NDEBUG
// Wait until Timer starting waiting, call the optional callback, then wait
// for Timer waiting again.
// Tests can provide a custom env object to mock time, and use the callback
// here to bump current time and trigger Timer. See timer_test for example.
//
// Note: only support one caller of this method.
void TEST_WaitForRun(std::function<void()> callback = nullptr) {
InstrumentedMutexLock l(&mutex_);
while (!heap_.empty() &&
heap_.top()->next_run_time_us <= env_->NowMicros()) {
// It act as a spin lock
while (executing_task_ ||
(!heap_.empty() &&
heap_.top()->next_run_time_us <= env_->NowMicros())) {
cond_var_.TimedWait(env_->NowMicros() + 1000);
}
if (callback != nullptr) {
@ -150,8 +164,9 @@ class Timer {
cond_var_.SignalAll();
do {
cond_var_.TimedWait(env_->NowMicros() + 1000);
} while (!heap_.empty() &&
heap_.top()->next_run_time_us <= env_->NowMicros());
} while (
executing_task_ ||
(!heap_.empty() && heap_.top()->next_run_time_us <= env_->NowMicros()));
}
size_t TEST_GetPendingTaskNum() const {

View File

@ -15,272 +15,187 @@ class TimerTest : public testing::Test {
protected:
std::unique_ptr<MockTimeEnv> mock_env_;
#if defined(OS_MACOSX) && !defined(NDEBUG)
// On some platforms (MacOS) pthread_cond_timedwait does not appear
// to release the lock for other threads to operate if the deadline time
// is already passed. This is a problem for tests in general because
// TimedWait calls are a bad abstraction: the deadline parameter is
// usually computed from Env time, but is interpreted in real clock time.
// Since this test doesn't even pretend to use clock times, we have
// to mock TimedWait to ensure it yields.
void SetUp() override {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
uint64_t* time_us = reinterpret_cast<uint64_t*>(arg);
if (*time_us < mock_env_->RealNowMicros()) {
*time_us = mock_env_->RealNowMicros() + 1000;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
}
#endif // OS_MACOSX && !NDEBUG
const uint64_t kSecond = 1000000; // 1sec = 1000000us
void SetUp() override { mock_env_->InstallTimedWaitFixCallback(); }
};
TEST_F(TimerTest, SingleScheduleOnceTest) {
const int kIterations = 1;
uint64_t time_counter = 0;
mock_env_->set_current_time(0);
InstrumentedMutex mutex;
InstrumentedCondVar test_cv(&mutex);
const int kInitDelaySec = 1;
int mock_time_sec = 0;
mock_env_->set_current_time(mock_time_sec);
Timer timer(mock_env_.get());
int count = 0;
timer.Add(
[&] {
InstrumentedMutexLock l(&mutex);
count++;
if (count >= kIterations) {
test_cv.SignalAll();
}
},
"fn_sch_test", 1 * kSecond, 0);
timer.Add([&] { count++; }, "fn_sch_test", kInitDelaySec * kSecond, 0);
ASSERT_TRUE(timer.Start());
ASSERT_EQ(0, count);
// Wait for execution to finish
{
InstrumentedMutexLock l(&mutex);
while(count < kIterations) {
time_counter += kSecond;
mock_env_->set_current_time(time_counter);
test_cv.TimedWait(time_counter);
}
}
mock_time_sec += kInitDelaySec;
timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_EQ(1, count);
ASSERT_TRUE(timer.Shutdown());
ASSERT_EQ(1, count);
}
TEST_F(TimerTest, MultipleScheduleOnceTest) {
const int kIterations = 1;
uint64_t time_counter = 0;
mock_env_->set_current_time(0);
InstrumentedMutex mutex1;
InstrumentedCondVar test_cv1(&mutex1);
const int kInitDelay1Sec = 1;
const int kInitDelay2Sec = 3;
int mock_time_sec = 0;
mock_env_->set_current_time(mock_time_sec);
Timer timer(mock_env_.get());
int count1 = 0;
timer.Add(
[&] {
InstrumentedMutexLock l(&mutex1);
count1++;
if (count1 >= kIterations) {
test_cv1.SignalAll();
}
},
"fn_sch_test1", 1 * kSecond, 0);
InstrumentedMutex mutex2;
InstrumentedCondVar test_cv2(&mutex2);
int count1 = 0;
timer.Add([&] { count1++; }, "fn_sch_test1", kInitDelay1Sec * kSecond, 0);
int count2 = 0;
timer.Add(
[&] {
InstrumentedMutexLock l(&mutex2);
count2 += 5;
if (count2 >= kIterations) {
test_cv2.SignalAll();
}
},
"fn_sch_test2", 3 * kSecond, 0);
timer.Add([&] { count2++; }, "fn_sch_test2", kInitDelay2Sec * kSecond, 0);
ASSERT_TRUE(timer.Start());
ASSERT_EQ(0, count1);
ASSERT_EQ(0, count2);
// Wait for execution to finish
{
InstrumentedMutexLock l(&mutex1);
while (count1 < kIterations) {
time_counter += kSecond;
mock_env_->set_current_time(time_counter);
test_cv1.TimedWait(time_counter);
}
}
// Wait for execution to finish
{
InstrumentedMutexLock l(&mutex2);
while(count2 < kIterations) {
time_counter += kSecond;
mock_env_->set_current_time(time_counter);
test_cv2.TimedWait(time_counter);
}
}
ASSERT_TRUE(timer.Shutdown());
mock_time_sec = kInitDelay1Sec;
timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_EQ(1, count1);
ASSERT_EQ(5, count2);
ASSERT_EQ(0, count2);
mock_time_sec = kInitDelay2Sec;
timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_EQ(1, count1);
ASSERT_EQ(1, count2);
ASSERT_TRUE(timer.Shutdown());
}
TEST_F(TimerTest, SingleScheduleRepeatedlyTest) {
const int kIterations = 5;
uint64_t time_counter = 0;
mock_env_->set_current_time(0);
InstrumentedMutex mutex;
InstrumentedCondVar test_cv(&mutex);
const int kInitDelaySec = 1;
const int kRepeatSec = 1;
int mock_time_sec = 0;
mock_env_->set_current_time(mock_time_sec);
Timer timer(mock_env_.get());
int count = 0;
timer.Add(
[&] {
InstrumentedMutexLock l(&mutex);
count++;
if (count >= kIterations) {
test_cv.SignalAll();
}
},
"fn_sch_test", 1 * kSecond, 1 * kSecond);
timer.Add([&] { count++; }, "fn_sch_test", kInitDelaySec * kSecond,
kRepeatSec * kSecond);
ASSERT_TRUE(timer.Start());
ASSERT_EQ(0, count);
mock_time_sec += kInitDelaySec;
timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_EQ(1, count);
// Wait for execution to finish
{
InstrumentedMutexLock l(&mutex);
while(count < kIterations) {
time_counter += kSecond;
mock_env_->set_current_time(time_counter);
test_cv.TimedWait(time_counter);
}
for (int i = 1; i < kIterations; i++) {
mock_time_sec += kRepeatSec;
timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); });
}
ASSERT_EQ(kIterations, count);
ASSERT_TRUE(timer.Shutdown());
ASSERT_EQ(5, count);
}
TEST_F(TimerTest, MultipleScheduleRepeatedlyTest) {
uint64_t time_counter = 0;
mock_env_->set_current_time(0);
const int kInitDelay1Sec = 0;
const int kInitDelay2Sec = 1;
const int kInitDelay3Sec = 0;
const int kRepeatSec = 2;
const int kLargeRepeatSec = 100;
const int kIterations = 5;
int mock_time_sec = 0;
mock_env_->set_current_time(mock_time_sec);
Timer timer(mock_env_.get());
InstrumentedMutex mutex1;
InstrumentedCondVar test_cv1(&mutex1);
const int kIterations1 = 5;
int count1 = 0;
timer.Add(
[&] {
InstrumentedMutexLock l(&mutex1);
count1++;
if (count1 >= kIterations1) {
test_cv1.SignalAll();
}
},
"fn_sch_test1", 0, 2 * kSecond);
timer.Add([&] { count1++; }, "fn_sch_test1", kInitDelay1Sec * kSecond,
kRepeatSec * kSecond);
InstrumentedMutex mutex2;
InstrumentedCondVar test_cv2(&mutex2);
const int kIterations2 = 5;
int count2 = 0;
timer.Add(
[&] {
InstrumentedMutexLock l(&mutex2);
count2++;
if (count2 >= kIterations2) {
test_cv2.SignalAll();
}
},
"fn_sch_test2", 1 * kSecond, 2 * kSecond);
timer.Add([&] { count2++; }, "fn_sch_test2", kInitDelay2Sec * kSecond,
kRepeatSec * kSecond);
// Add a function with relatively large repeat interval
int count3 = 0;
timer.Add([&] { count3++; }, "fn_sch_test3", kInitDelay3Sec * kSecond,
kLargeRepeatSec * kSecond);
ASSERT_TRUE(timer.Start());
ASSERT_EQ(0, count2);
ASSERT_EQ(0, count3);
// Wait for execution to finish
{
InstrumentedMutexLock l(&mutex1);
while(count1 < kIterations1) {
time_counter += kSecond;
mock_env_->set_current_time(time_counter);
test_cv1.TimedWait(time_counter);
}
for (; count1 < kIterations; mock_time_sec++) {
timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_EQ((mock_time_sec + 2) / kRepeatSec, count1);
ASSERT_EQ((mock_time_sec + 1) / kRepeatSec, count2);
// large interval function should only run once (the first one).
ASSERT_EQ(1, count3);
}
timer.Cancel("fn_sch_test1");
// Wait for execution to finish
{
InstrumentedMutexLock l(&mutex2);
while(count2 < kIterations2) {
time_counter += kSecond;
mock_env_->set_current_time(time_counter);
test_cv2.TimedWait(time_counter);
}
}
mock_time_sec++;
timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_EQ(kIterations, count1);
ASSERT_EQ(kIterations, count2);
ASSERT_EQ(1, count3);
timer.Cancel("fn_sch_test2");
ASSERT_TRUE(timer.Shutdown());
ASSERT_EQ(kIterations, count1);
ASSERT_EQ(kIterations, count2);
ASSERT_EQ(count1, 5);
ASSERT_EQ(count2, 5);
// execute the long interval one
mock_time_sec = kLargeRepeatSec;
timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_EQ(2, count3);
ASSERT_TRUE(timer.Shutdown());
}
TEST_F(TimerTest, AddAfterStartTest) {
const int kIterations = 5;
InstrumentedMutex mutex;
InstrumentedCondVar test_cv(&mutex);
const int kInitDelaySec = 1;
const int kRepeatSec = 1;
// wait timer to run and then add a new job
SyncPoint::GetInstance()->LoadDependency(
{{"Timer::Run::Waiting", "TimerTest:AddAfterStartTest:1"}});
SyncPoint::GetInstance()->EnableProcessing();
mock_env_->set_current_time(0);
int mock_time_sec = 0;
mock_env_->set_current_time(mock_time_sec);
Timer timer(mock_env_.get());
ASSERT_TRUE(timer.Start());
TEST_SYNC_POINT("TimerTest:AddAfterStartTest:1");
int count = 0;
timer.Add(
[&] {
InstrumentedMutexLock l(&mutex);
count++;
if (count >= kIterations) {
test_cv.SignalAll();
}
},
"fn_sch_test", 1 * kSecond, 1 * kSecond);
timer.Add([&] { count++; }, "fn_sch_test", kInitDelaySec * kSecond,
kRepeatSec * kSecond);
ASSERT_EQ(0, count);
// Wait for execution to finish
uint64_t time_counter = 0;
{
InstrumentedMutexLock l(&mutex);
while (count < kIterations) {
time_counter += kSecond;
mock_env_->set_current_time(time_counter);
test_cv.TimedWait(time_counter);
}
mock_time_sec += kInitDelaySec;
timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_EQ(1, count);
for (int i = 1; i < kIterations; i++) {
mock_time_sec += kRepeatSec;
timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); });
}
ASSERT_EQ(kIterations, count);
ASSERT_TRUE(timer.Shutdown());
ASSERT_EQ(kIterations, count);
}
TEST_F(TimerTest, CancelRunningTask) {
@ -356,35 +271,86 @@ TEST_F(TimerTest, ShutdownRunningTask) {
delete value;
}
TEST_F(TimerTest, AddSameFuncNameTest) {
mock_env_->set_current_time(0);
TEST_F(TimerTest, AddSameFuncName) {
const int kInitDelaySec = 1;
const int kRepeat1Sec = 5;
const int kRepeat2Sec = 4;
int mock_time_sec = 0;
mock_env_->set_current_time(mock_time_sec);
Timer timer(mock_env_.get());
ASSERT_TRUE(timer.Start());
int func_counter1 = 0;
timer.Add([&] { func_counter1++; }, "duplicated_func", 1 * kSecond,
5 * kSecond);
timer.Add([&] { func_counter1++; }, "duplicated_func",
kInitDelaySec * kSecond, kRepeat1Sec * kSecond);
int func2_counter = 0;
timer.Add([&] { func2_counter++; }, "func2", 1 * kSecond, 4 * kSecond);
timer.Add([&] { func2_counter++; }, "func2", kInitDelaySec * kSecond,
kRepeat2Sec * kSecond);
// New function with the same name should override the existing one
int func_counter2 = 0;
timer.Add([&] { func_counter2++; }, "duplicated_func", 1 * kSecond,
5 * kSecond);
timer.Add([&] { func_counter2++; }, "duplicated_func",
kInitDelaySec * kSecond, kRepeat1Sec * kSecond);
timer.TEST_WaitForRun([&] { mock_env_->set_current_time(1); });
ASSERT_EQ(0, func_counter1);
ASSERT_EQ(0, func2_counter);
ASSERT_EQ(0, func_counter2);
ASSERT_EQ(func_counter1, 0);
ASSERT_EQ(func2_counter, 1);
ASSERT_EQ(func_counter2, 1);
mock_time_sec += kInitDelaySec;
timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); });
timer.TEST_WaitForRun([&] { mock_env_->set_current_time(6); });
ASSERT_EQ(0, func_counter1);
ASSERT_EQ(1, func2_counter);
ASSERT_EQ(1, func_counter2);
ASSERT_EQ(func_counter1, 0);
ASSERT_EQ(func2_counter, 2);
ASSERT_EQ(func_counter2, 2);
mock_time_sec += kRepeat1Sec;
timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_EQ(0, func_counter1);
ASSERT_EQ(2, func2_counter);
ASSERT_EQ(2, func_counter2);
ASSERT_TRUE(timer.Shutdown());
}
TEST_F(TimerTest, RepeatIntervalWithFuncRunningTime) {
const int kInitDelaySec = 1;
const int kRepeatSec = 5;
const int kFuncRunningTimeSec = 1;
int mock_time_sec = 0;
mock_env_->set_current_time(mock_time_sec);
Timer timer(mock_env_.get());
ASSERT_TRUE(timer.Start());
int func_counter = 0;
timer.Add(
[&] {
mock_env_->set_current_time(mock_time_sec + kFuncRunningTimeSec);
func_counter++;
},
"func", kInitDelaySec * kSecond, kRepeatSec * kSecond);
ASSERT_EQ(0, func_counter);
mock_time_sec += kInitDelaySec;
timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_EQ(1, func_counter);
// After repeat interval time, the function is not executed, as running
// the function takes some time (`kFuncRunningTimeSec`). The repeat interval
// is the time between ending time of the last call and starting time of the
// next call.
mock_time_sec += kRepeatSec;
timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_EQ(1, func_counter);
mock_time_sec += kFuncRunningTimeSec;
timer.TEST_WaitForRun([&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_EQ(2, func_counter);
ASSERT_TRUE(timer.Shutdown());
}