diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java index cd19da4876..5b180cc11d 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java @@ -599,6 +599,9 @@ public class DefaultPromise extends AbstractFuture implements Promise { try { for (;;) { synchronized (this) { + if (isDone()) { + return true; + } incWaiters(); try { wait(waitTime / 1000000, (int) (waitTime % 1000000)); diff --git a/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java b/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java index d9aea41349..26b469f065 100644 --- a/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java +++ b/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java @@ -21,6 +21,8 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import org.junit.BeforeClass; import org.junit.Test; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; @@ -33,8 +35,10 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import static java.lang.Math.max; +import static org.hamcrest.Matchers.lessThan; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @SuppressWarnings("unchecked") @@ -204,6 +208,39 @@ public class DefaultPromiseTest { testLateListenerIsOrderedCorrectly(fakeException()); } + @Test + public void testSignalRace() { + final long wait = TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS); + EventExecutor executor = null; + try { + executor = new TestEventExecutor(); + + final int numberOfAttempts = 4096; + final Map> promises = new HashMap>(); + for (int i = 0; i < numberOfAttempts; i++) { + final DefaultPromise promise = new DefaultPromise(executor); + final Thread thread = new Thread(new Runnable() { + @Override + public void run() { + promise.setSuccess(null); + } + }); + promises.put(thread, promise); + } + + for (final Map.Entry> promise : promises.entrySet()) { + promise.getKey().start(); + final long start = System.nanoTime(); + promise.getValue().awaitUninterruptibly(wait, TimeUnit.NANOSECONDS); + assertThat(System.nanoTime() - start, lessThan(wait)); + } + } finally { + if (executor != null) { + executor.shutdownGracefully(); + } + } + } + private void testStackOverFlowChainedFuturesA(int promiseChainLength, final EventExecutor executor, boolean runTestInExecutorThread) throws InterruptedException {