Avoid missed signals on a default promise
Motivation: Today when awaiting uninterruptibly on a default promise, a race condition can lead to a missed signal. Quite simply, the check for whether the condition holds is not made inside a lock before waiting. This means that the waiting thread can enter the wait after the promise has completed and will thus not be notified, thus missing the signal. This leads to the waiting thread to enter a timed wait that will only trip with the timeout elapses leading to unnecessarily long waits (imagine a connection timeout, and the waiting thread missed the signal that the connection is ready). Modification: This commit fixes this missed signal by checking the condition inside a lock. We also add a test that reliably fails without the non-racy condition check. Result: Timed uninterruptible waits on default promise will not race against the condition and possibly wait longer than necessary.
This commit is contained in:
parent
ff51dfce03
commit
9380fdeb43
@ -599,6 +599,9 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
|||||||
try {
|
try {
|
||||||
for (;;) {
|
for (;;) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
|
if (isDone()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
incWaiters();
|
incWaiters();
|
||||||
try {
|
try {
|
||||||
wait(waitTime / 1000000, (int) (waitTime % 1000000));
|
wait(waitTime / 1000000, (int) (waitTime % 1000000));
|
||||||
|
@ -21,6 +21,8 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
|
|||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.CancellationException;
|
import java.util.concurrent.CancellationException;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
@ -32,8 +34,10 @@ import java.util.concurrent.TimeoutException;
|
|||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import static java.lang.Math.max;
|
import static java.lang.Math.max;
|
||||||
|
import static org.hamcrest.Matchers.lessThan;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertSame;
|
import static org.junit.Assert.assertSame;
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@ -193,6 +197,39 @@ public class DefaultPromiseTest {
|
|||||||
testLateListenerIsOrderedCorrectly(fakeException());
|
testLateListenerIsOrderedCorrectly(fakeException());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSignalRace() {
|
||||||
|
final long wait = TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS);
|
||||||
|
EventExecutor executor = null;
|
||||||
|
try {
|
||||||
|
executor = new TestEventExecutor();
|
||||||
|
|
||||||
|
final int numberOfAttempts = 4096;
|
||||||
|
final Map<Thread, DefaultPromise<Void>> promises = new HashMap<Thread, DefaultPromise<Void>>();
|
||||||
|
for (int i = 0; i < numberOfAttempts; i++) {
|
||||||
|
final DefaultPromise<Void> promise = new DefaultPromise<Void>(executor);
|
||||||
|
final Thread thread = new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
promise.setSuccess(null);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
promises.put(thread, promise);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (final Map.Entry<Thread, DefaultPromise<Void>> promise : promises.entrySet()) {
|
||||||
|
promise.getKey().start();
|
||||||
|
final long start = System.nanoTime();
|
||||||
|
promise.getValue().awaitUninterruptibly(wait, TimeUnit.NANOSECONDS);
|
||||||
|
assertThat(System.nanoTime() - start, lessThan(wait));
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (executor != null) {
|
||||||
|
executor.shutdownGracefully();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void testStackOverFlowChainedFuturesA(int promiseChainLength, final EventExecutor executor,
|
private void testStackOverFlowChainedFuturesA(int promiseChainLength, final EventExecutor executor,
|
||||||
boolean runTestInExecutorThread)
|
boolean runTestInExecutorThread)
|
||||||
throws InterruptedException {
|
throws InterruptedException {
|
||||||
|
Loading…
Reference in New Issue
Block a user