Avoid missed signals on a default promise
Motivation: Today when awaiting uninterruptibly on a default promise, a race condition can lead to a missed signal. Quite simply, the check for whether the condition holds is not made inside a lock before waiting. This means that the waiting thread can enter the wait after the promise has completed and will thus not be notified, thus missing the signal. This leads to the waiting thread to enter a timed wait that will only trip with the timeout elapses leading to unnecessarily long waits (imagine a connection timeout, and the waiting thread missed the signal that the connection is ready). Modification: This commit fixes this missed signal by checking the condition inside a lock. We also add a test that reliably fails without the non-racy condition check. Result: Timed uninterruptible waits on default promise will not race against the condition and possibly wait longer than necessary.
This commit is contained in:
parent
b37a41a535
commit
cfd6db7915
@ -599,6 +599,9 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
||||
try {
|
||||
for (;;) {
|
||||
synchronized (this) {
|
||||
if (isDone()) {
|
||||
return true;
|
||||
}
|
||||
incWaiters();
|
||||
try {
|
||||
wait(waitTime / 1000000, (int) (waitTime % 1000000));
|
||||
|
@ -21,6 +21,8 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
@ -33,8 +35,10 @@ import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static java.lang.Math.max;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@ -204,6 +208,39 @@ public class DefaultPromiseTest {
|
||||
testLateListenerIsOrderedCorrectly(fakeException());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSignalRace() {
|
||||
final long wait = TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS);
|
||||
EventExecutor executor = null;
|
||||
try {
|
||||
executor = new TestEventExecutor();
|
||||
|
||||
final int numberOfAttempts = 4096;
|
||||
final Map<Thread, DefaultPromise<Void>> promises = new HashMap<Thread, DefaultPromise<Void>>();
|
||||
for (int i = 0; i < numberOfAttempts; i++) {
|
||||
final DefaultPromise<Void> promise = new DefaultPromise<Void>(executor);
|
||||
final Thread thread = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
promise.setSuccess(null);
|
||||
}
|
||||
});
|
||||
promises.put(thread, promise);
|
||||
}
|
||||
|
||||
for (final Map.Entry<Thread, DefaultPromise<Void>> promise : promises.entrySet()) {
|
||||
promise.getKey().start();
|
||||
final long start = System.nanoTime();
|
||||
promise.getValue().awaitUninterruptibly(wait, TimeUnit.NANOSECONDS);
|
||||
assertThat(System.nanoTime() - start, lessThan(wait));
|
||||
}
|
||||
} finally {
|
||||
if (executor != null) {
|
||||
executor.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void testStackOverFlowChainedFuturesA(int promiseChainLength, final EventExecutor executor,
|
||||
boolean runTestInExecutorThread)
|
||||
throws InterruptedException {
|
||||
|
Loading…
Reference in New Issue
Block a user