Fix Timer unable to schedule new added job (#7216)

Summary:
And added test to reproduce the problem.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7216

Reviewed By: riversand963

Differential Revision: D22905193

Pulled By: jay-zhuang

fbshipit-source-id: 8ca1435c91bf829f9076c743bdd66861364ff68c
This commit is contained in:
Jay Zhuang 2020-08-04 09:18:45 -07:00 committed by Facebook GitHub Bot
parent 8cb278d11a
commit fea286d914
2 changed files with 46 additions and 0 deletions

View File

@ -15,6 +15,7 @@
#include "monitoring/instrumented_mutex.h" #include "monitoring/instrumented_mutex.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "test_util/sync_point.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -56,6 +57,7 @@ class Timer {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
heap_.push(fn_info.get()); heap_.push(fn_info.get());
map_.emplace(std::make_pair(fn_name, std::move(fn_info))); map_.emplace(std::make_pair(fn_name, std::move(fn_info)));
cond_var_.Signal();
} }
void Cancel(const std::string& fn_name) { void Cancel(const std::string& fn_name) {
@ -112,6 +114,7 @@ class Timer {
while (running_) { while (running_) {
if (heap_.empty()) { if (heap_.empty()) {
// wait // wait
TEST_SYNC_POINT("Timer::Run::Waiting");
cond_var_.Wait(); cond_var_.Wait();
continue; continue;
} }

View File

@ -237,6 +237,49 @@ TEST_F(TimerTest, MultipleScheduleRepeatedlyTest) {
ASSERT_EQ(count2, 5); ASSERT_EQ(count2, 5);
} }
TEST_F(TimerTest, AddAfterStartTest) {
const int kIterations = 5;
InstrumentedMutex mutex;
InstrumentedCondVar test_cv(&mutex);
// wait timer to run and then add a new job
SyncPoint::GetInstance()->LoadDependency(
{{"Timer::Run::Waiting", "TimerTest:AddAfterStartTest:1"}});
SyncPoint::GetInstance()->EnableProcessing();
mock_env_->set_current_time(0);
Timer timer(mock_env_.get());
ASSERT_TRUE(timer.Start());
TEST_SYNC_POINT("TimerTest:AddAfterStartTest:1");
int count = 0;
timer.Add(
[&] {
InstrumentedMutexLock l(&mutex);
count++;
if (count >= kIterations) {
test_cv.SignalAll();
}
},
"fn_sch_test", 1 * kSecond, 1 * kSecond);
// Wait for execution to finish
uint64_t time_counter = 0;
{
InstrumentedMutexLock l(&mutex);
while (count < kIterations) {
time_counter += kSecond;
mock_env_->set_current_time(time_counter);
test_cv.TimedWait(time_counter);
}
}
ASSERT_TRUE(timer.Shutdown());
ASSERT_EQ(kIterations, count);
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {