diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java index 7211104338..674733ccdd 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java @@ -18,6 +18,7 @@ package io.netty.util.concurrent; import io.netty.util.Signal; import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.InternalThreadLocalMap; +import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; @@ -27,7 +28,7 @@ import java.util.ArrayDeque; import java.util.concurrent.CancellationException; import java.util.concurrent.TimeUnit; -import static java.util.concurrent.TimeUnit.*; +import static java.util.concurrent.TimeUnit.MILLISECONDS; public class DefaultPromise extends AbstractFuture implements Promise { @@ -576,7 +577,7 @@ public class DefaultPromise extends AbstractFuture implements Promise { if (listeners instanceof DefaultFutureListeners) { final DefaultFutureListeners dfl = (DefaultFutureListeners) listeners; - execute(executor, new Runnable() { + execute(executor, new OneTimeTask() { @Override public void run() { notifyListeners0(DefaultPromise.this, dfl); @@ -586,7 +587,7 @@ public class DefaultPromise extends AbstractFuture implements Promise { } else { final GenericFutureListener> l = (GenericFutureListener>) listeners; - execute(executor, new Runnable() { + execute(executor, new OneTimeTask() { @Override public void run() { notifyListener0(DefaultPromise.this, l); @@ -612,7 +613,9 @@ public class DefaultPromise extends AbstractFuture implements Promise { private void notifyLateListener(final GenericFutureListener l) { final EventExecutor executor = executor(); if (executor.inEventLoop()) { - if (listeners == null && lateListeners == null) { + // Execute immediately if late listeners is empty. This allows subsequent late listeners + // that are added after completion to be notified immediately and preserver order. + if (listeners == null && (lateListeners == null || lateListeners.isEmpty())) { final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { @@ -658,7 +661,7 @@ public class DefaultPromise extends AbstractFuture implements Promise { } } - execute(eventExecutor, new Runnable() { + execute(eventExecutor, new OneTimeTask() { @Override public void run() { notifyListener0(future, l); @@ -752,7 +755,7 @@ public class DefaultPromise extends AbstractFuture implements Promise { if (listeners instanceof GenericProgressiveFutureListener[]) { final GenericProgressiveFutureListener[] array = (GenericProgressiveFutureListener[]) listeners; - execute(executor, new Runnable() { + execute(executor, new OneTimeTask() { @Override public void run() { notifyProgressiveListeners0(self, array, progress, total); @@ -761,7 +764,7 @@ public class DefaultPromise extends AbstractFuture implements Promise { } else { final GenericProgressiveFutureListener> l = (GenericProgressiveFutureListener>) listeners; - execute(executor, new Runnable() { + execute(executor, new OneTimeTask() { @Override public void run() { notifyProgressiveListener0(self, l, progress, total); diff --git a/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java b/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java index ea7a56e654..59085d6b89 100644 --- a/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java +++ b/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java @@ -23,9 +23,13 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; -import static org.hamcrest.CoreMatchers.*; -import static org.junit.Assert.*; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; @SuppressWarnings("unchecked") public class DefaultPromiseTest { @@ -84,54 +88,56 @@ public class DefaultPromiseTest { @Test public void testListenerNotifyOrder() throws Exception { EventExecutor executor = new TestEventExecutor(); - final BlockingQueue> listeners = new LinkedBlockingQueue>(); - int runs = 100000; + try { + final BlockingQueue> listeners = new LinkedBlockingQueue>(); + int runs = 100000; - for (int i = 0; i < runs; i++) { - final Promise promise = new DefaultPromise(executor); - final FutureListener listener1 = new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - listeners.add(this); - } - }; - final FutureListener listener2 = new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - listeners.add(this); - } - }; - final FutureListener listener4 = new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - listeners.add(this); - } - }; - final FutureListener listener3 = new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - // Ensure listener4 is notified *after* this method returns to maintain the order. - future.addListener(listener4); - listeners.add(this); - } - }; + for (int i = 0; i < runs; i++) { + final Promise promise = new DefaultPromise(executor); + final FutureListener listener1 = new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + listeners.add(this); + } + }; + final FutureListener listener2 = new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + listeners.add(this); + } + }; + final FutureListener listener4 = new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + listeners.add(this); + } + }; + final FutureListener listener3 = new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + listeners.add(this); + future.addListener(listener4); + } + }; - GlobalEventExecutor.INSTANCE.execute(new Runnable() { - @Override - public void run() { - promise.setSuccess(null); - } - }); + GlobalEventExecutor.INSTANCE.execute(new Runnable() { + @Override + public void run() { + promise.setSuccess(null); + } + }); - promise.addListener(listener1).addListener(listener2).addListener(listener3); + promise.addListener(listener1).addListener(listener2).addListener(listener3); - assertSame("Fail during run " + i + " / " + runs, listener1, listeners.take()); - assertSame("Fail during run " + i + " / " + runs, listener2, listeners.take()); - assertSame("Fail during run " + i + " / " + runs, listener3, listeners.take()); - assertSame("Fail during run " + i + " / " + runs, listener4, listeners.take()); - assertTrue("Fail during run " + i + " / " + runs, listeners.isEmpty()); + assertSame("Fail 1 during run " + i + " / " + runs, listener1, listeners.take()); + assertSame("Fail 2 during run " + i + " / " + runs, listener2, listeners.take()); + assertSame("Fail 3 during run " + i + " / " + runs, listener3, listeners.take()); + assertSame("Fail 4 during run " + i + " / " + runs, listener4, listeners.take()); + assertTrue("Fail during run " + i + " / " + runs, listeners.isEmpty()); + } + } finally { + executor.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync(); } - executor.shutdownGracefully().sync(); } @Test @@ -145,7 +151,7 @@ public class DefaultPromiseTest { @Test(timeout = 2000) public void testPromiseListenerAddWhenCompleteFailure() throws Exception { - testPromiseListenerAddWhenComplete(new RuntimeException()); + testPromiseListenerAddWhenComplete(fakeException()); } @Test(timeout = 2000) @@ -153,6 +159,103 @@ public class DefaultPromiseTest { testPromiseListenerAddWhenComplete(null); } + @Test(timeout = 2000) + public void testLateListenerIsOrderedCorrectlySuccess() throws InterruptedException { + final EventExecutor executor = new TestEventExecutor(); + try { + testLateListenerIsOrderedCorrectly(null); + } finally { + executor.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync(); + } + } + + @Test(timeout = 2000) + public void testLateListenerIsOrderedCorrectlyFailure() throws InterruptedException { + final EventExecutor executor = new TestEventExecutor(); + try { + testLateListenerIsOrderedCorrectly(fakeException()); + } finally { + executor.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync(); + } + } + + /** + * This test is mean to simulate the following sequence of events, which all take place on the I/O thread: + *
    + *
  1. A write is done
  2. + *
  3. The write operation completes, and the promise state is changed to done
  4. + *
  5. A listener is added to the return from the write. The {@link FutureListener#operationComplete()} updates + * state which must be invoked before the response to the previous write is read.
  6. + *
  7. The write operation
  8. + *
+ */ + private static void testLateListenerIsOrderedCorrectly(Throwable cause) throws InterruptedException { + final EventExecutor executor = new TestEventExecutor(); + try { + final AtomicInteger state = new AtomicInteger(); + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(2); + final Promise promise = new DefaultPromise(executor); + + // Add a listener before completion so "lateListener" is used next time we add a listener. + promise.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + assertTrue(state.compareAndSet(0, 1)); + } + }); + + // Simulate write operation completing, which will execute listeners in another thread. + if (cause == null) { + promise.setSuccess(null); + } else { + promise.setFailure(cause); + } + + // Add a "late listener" + promise.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + assertTrue(state.compareAndSet(1, 2)); + latch1.countDown(); + } + }); + + // Wait for the listeners and late listeners to be completed. + latch1.await(); + assertEquals(2, state.get()); + + // This is the important listener. A late listener that is added after all late listeners + // have completed, and needs to update state before a read operation (on the same executor). + executor.execute(new Runnable() { + @Override + public void run() { + promise.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + assertTrue(state.compareAndSet(2, 3)); + latch2.countDown(); + } + }); + } + }); + + // Simulate a read operation being queued up in the executor. + executor.execute(new Runnable() { + @Override + public void run() { + // This is the key, we depend upon the state being set in the next listener. + assertEquals(3, state.get()); + latch2.countDown(); + } + }); + + latch2.await(); + } finally { + executor.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync(); + } + } + private static void testPromiseListenerAddWhenComplete(Throwable cause) throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); final Promise promise = new DefaultPromise(ImmediateEventExecutor.INSTANCE); @@ -228,4 +331,8 @@ public class DefaultPromiseTest { } } } + + private static RuntimeException fakeException() { + return new RuntimeException("fake exception"); + } }