From cfd6db79150fdb9ea91a3ff2eb6117c23299a12c Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 11 Jul 2016 16:49:54 -0400 Subject: [PATCH] Avoid missed signals on a default promise Motivation: Today when awaiting uninterruptibly on a default promise, a race condition can lead to a missed signal. Quite simply, the check for whether the condition holds is not made inside a lock before waiting. This means that the waiting thread can enter the wait after the promise has completed and will thus not be notified, thus missing the signal. This leads to the waiting thread to enter a timed wait that will only trip with the timeout elapses leading to unnecessarily long waits (imagine a connection timeout, and the waiting thread missed the signal that the connection is ready). Modification: This commit fixes this missed signal by checking the condition inside a lock. We also add a test that reliably fails without the non-racy condition check. Result: Timed uninterruptible waits on default promise will not race against the condition and possibly wait longer than necessary. --- .../netty/util/concurrent/DefaultPromise.java | 3 ++ .../util/concurrent/DefaultPromiseTest.java | 37 +++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java index cd19da4876..5b180cc11d 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java @@ -599,6 +599,9 @@ public class DefaultPromise extends AbstractFuture implements Promise { try { for (;;) { synchronized (this) { + if (isDone()) { + return true; + } incWaiters(); try { wait(waitTime / 1000000, (int) (waitTime % 1000000)); diff --git a/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java b/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java index d9aea41349..26b469f065 100644 --- a/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java +++ b/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java @@ -21,6 +21,8 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import org.junit.BeforeClass; import org.junit.Test; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; @@ -33,8 +35,10 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import static java.lang.Math.max; +import static org.hamcrest.Matchers.lessThan; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @SuppressWarnings("unchecked") @@ -204,6 +208,39 @@ public class DefaultPromiseTest { testLateListenerIsOrderedCorrectly(fakeException()); } + @Test + public void testSignalRace() { + final long wait = TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS); + EventExecutor executor = null; + try { + executor = new TestEventExecutor(); + + final int numberOfAttempts = 4096; + final Map> promises = new HashMap>(); + for (int i = 0; i < numberOfAttempts; i++) { + final DefaultPromise promise = new DefaultPromise(executor); + final Thread thread = new Thread(new Runnable() { + @Override + public void run() { + promise.setSuccess(null); + } + }); + promises.put(thread, promise); + } + + for (final Map.Entry> promise : promises.entrySet()) { + promise.getKey().start(); + final long start = System.nanoTime(); + promise.getValue().awaitUninterruptibly(wait, TimeUnit.NANOSECONDS); + assertThat(System.nanoTime() - start, lessThan(wait)); + } + } finally { + if (executor != null) { + executor.shutdownGracefully(); + } + } + } + private void testStackOverFlowChainedFuturesA(int promiseChainLength, final EventExecutor executor, boolean runTestInExecutorThread) throws InterruptedException {