fix thread status synchronization in thread_list_test (#7825)
Summary: The test was flaky because the BG threads could increase `running_count_` up to `job_count_` before applying their thread status updates. Then the test thread would see non-deterministic results when counting threads with each status. The fix is to acquire mutex in test thread so it sees `running_count_` and thread status updated atomically. I think simply reordering the two updates would have been insufficient since the thread status update uses `memory_order_relaxed`. This change happens to also eliminate an undesirable sleep loop. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7825 Test Plan: injected sleeps to verify the failure repros before this PR and does not repro after. Reviewed By: jay-zhuang Differential Revision: D25742409 Pulled By: ajkr fbshipit-source-id: 926a2223fe856e20bc4c0c27df6736ee5cb02c97
This commit is contained in:
parent
bb0f781da0
commit
61e324422e
@ -38,6 +38,7 @@ class SimulatedBackgroundTask {
|
|||||||
void Run() {
|
void Run() {
|
||||||
std::unique_lock<std::mutex> l(mutex_);
|
std::unique_lock<std::mutex> l(mutex_);
|
||||||
running_count_++;
|
running_count_++;
|
||||||
|
bg_cv_.notify_all();
|
||||||
Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(cf_key_);
|
Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(cf_key_);
|
||||||
Env::Default()->GetThreadStatusUpdater()->SetThreadOperation(
|
Env::Default()->GetThreadStatusUpdater()->SetThreadOperation(
|
||||||
operation_type_);
|
operation_type_);
|
||||||
@ -58,9 +59,10 @@ class SimulatedBackgroundTask {
|
|||||||
bg_cv_.notify_all();
|
bg_cv_.notify_all();
|
||||||
}
|
}
|
||||||
|
|
||||||
void WaitUntilScheduled(int job_count, Env* env) {
|
void WaitUntilScheduled(int job_count) {
|
||||||
|
std::unique_lock<std::mutex> l(mutex_);
|
||||||
while (running_count_ < job_count) {
|
while (running_count_ < job_count) {
|
||||||
env->SleepForMicroseconds(1000);
|
bg_cv_.wait(l);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -139,8 +141,8 @@ TEST_F(ThreadListTest, SimpleColumnFamilyInfoTest) {
|
|||||||
env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
|
env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
|
||||||
&running_task, Env::Priority::LOW);
|
&running_task, Env::Priority::LOW);
|
||||||
}
|
}
|
||||||
running_task.WaitUntilScheduled(
|
running_task.WaitUntilScheduled(kSimulatedHighPriThreads +
|
||||||
kSimulatedHighPriThreads + kSimulatedLowPriThreads, env);
|
kSimulatedLowPriThreads);
|
||||||
|
|
||||||
std::vector<ThreadStatus> thread_list;
|
std::vector<ThreadStatus> thread_list;
|
||||||
|
|
||||||
@ -256,25 +258,25 @@ TEST_F(ThreadListTest, SimpleEventTest) {
|
|||||||
env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
|
env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
|
||||||
&flush_write_task, Env::Priority::HIGH);
|
&flush_write_task, Env::Priority::HIGH);
|
||||||
}
|
}
|
||||||
flush_write_task.WaitUntilScheduled(kFlushWriteTasks, env);
|
flush_write_task.WaitUntilScheduled(kFlushWriteTasks);
|
||||||
|
|
||||||
for (int t = 0; t < kCompactionWriteTasks; ++t) {
|
for (int t = 0; t < kCompactionWriteTasks; ++t) {
|
||||||
env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
|
env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
|
||||||
&compaction_write_task, Env::Priority::LOW);
|
&compaction_write_task, Env::Priority::LOW);
|
||||||
}
|
}
|
||||||
compaction_write_task.WaitUntilScheduled(kCompactionWriteTasks, env);
|
compaction_write_task.WaitUntilScheduled(kCompactionWriteTasks);
|
||||||
|
|
||||||
for (int t = 0; t < kCompactionReadTasks; ++t) {
|
for (int t = 0; t < kCompactionReadTasks; ++t) {
|
||||||
env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
|
env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
|
||||||
&compaction_read_task, Env::Priority::LOW);
|
&compaction_read_task, Env::Priority::LOW);
|
||||||
}
|
}
|
||||||
compaction_read_task.WaitUntilScheduled(kCompactionReadTasks, env);
|
compaction_read_task.WaitUntilScheduled(kCompactionReadTasks);
|
||||||
|
|
||||||
for (int t = 0; t < kCompactionWaitTasks; ++t) {
|
for (int t = 0; t < kCompactionWaitTasks; ++t) {
|
||||||
env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
|
env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
|
||||||
&compaction_wait_task, Env::Priority::LOW);
|
&compaction_wait_task, Env::Priority::LOW);
|
||||||
}
|
}
|
||||||
compaction_wait_task.WaitUntilScheduled(kCompactionWaitTasks, env);
|
compaction_wait_task.WaitUntilScheduled(kCompactionWaitTasks);
|
||||||
|
|
||||||
// verify the thread-status
|
// verify the thread-status
|
||||||
int operation_counts[ThreadStatus::NUM_OP_TYPES] = {0};
|
int operation_counts[ThreadStatus::NUM_OP_TYPES] = {0};
|
||||||
|
Loading…
Reference in New Issue
Block a user