Fix race condition in the NonStickyEventExecutorGroup (#8232)

Motivation:

There was a race condition between the task submitter and task executor threads such that the last Runnable submitted may not get executed. 

Modifications:

The bug was fixed by checking the task queue and state in the task executor thread after it saw the task queue was empty.

Result:

Fixes #8230
This commit is contained in:
Terence Yim 2018-08-29 10:42:01 -07:00 committed by Norman Maurer
parent 54f565ac67
commit 79706357c7
2 changed files with 48 additions and 1 deletions

View File

@ -259,7 +259,24 @@ public final class NonStickyEventExecutorGroup implements EventExecutorGroup {
}
} else {
state.set(NONE);
return; // done
// After setting the state to NONE, look at the tasks queue one more time.
// If it is empty, then we can return from this method.
// Otherwise, it means the producer thread has called execute(Runnable)
// and enqueued a task in between the tasks.poll() above and the state.set(NONE) here.
// There are two possible scenarios when this happen
//
// 1. The producer thread sees state == NONE, hence the compareAndSet(NONE, SUBMITTED)
// is successfully setting the state to SUBMITTED. This mean the producer
// will call / has called executor.execute(this). In this case, we can just return.
// 2. The producer thread don't see the state change, hence the compareAndSet(NONE, SUBMITTED)
// returns false. In this case, the producer thread won't call executor.execute.
// In this case, we need to change the state to RUNNING and keeps running.
//
// The above cases can be distinguished by performing a
// compareAndSet(NONE, RUNNING). If it returns "false", it is case 1; otherwise it is case 2.
if (tasks.peek() == null || !state.compareAndSet(NONE, RUNNING)) {
return; // done
}
}
}
}

View File

@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -93,6 +94,35 @@ public class NonStickyEventExecutorGroupTest {
}
}
@Test
public void testRaceCondition() throws InterruptedException {
EventExecutorGroup group = new UnorderedThreadPoolEventExecutor(1);
NonStickyEventExecutorGroup nonStickyGroup = new NonStickyEventExecutorGroup(group, maxTaskExecutePerRun);
try {
EventExecutor executor = nonStickyGroup.next();
for (int j = 0; j < 5000; j++) {
final CountDownLatch firstCompleted = new CountDownLatch(1);
final CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
firstCompleted.countDown();
latch.countDown();
}
});
Assert.assertTrue(firstCompleted.await(1, TimeUnit.SECONDS));
}
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
} finally {
nonStickyGroup.shutdownGracefully();
}
}
private static void execute(EventExecutorGroup group, CountDownLatch startLatch) throws Throwable {
EventExecutor executor = group.next();
Assert.assertTrue(executor instanceof OrderedEventExecutor);