From ecc8fb1b896001e48d5b6bc7afa538f9b3ab1aca Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 20 Feb 2014 14:57:29 +0100 Subject: [PATCH] Fix a regression which could lead to GenericFutureListeners never been notifed. Part of [#2186]. This regression was introduced by commit c97f2d2de00ad74835067cb6f5a62cd4651d1161 --- .../netty/util/concurrent/DefaultPromise.java | 2 +- .../util/concurrent/DefaultPromiseTest.java | 86 +++++++++++++++---- 2 files changed, 68 insertions(+), 20 deletions(-) diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java index fcacd61832..6586a615ea 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java @@ -567,9 +567,9 @@ public class DefaultPromise extends AbstractFuture implements Promise { final GenericFutureListener> l = (GenericFutureListener>) listeners; notifyListener0(this, l); - this.listeners = null; } } finally { + this.listeners = null; LISTENER_STACK_DEPTH.set(stackDepth); } return; diff --git a/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java b/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java index 519802c449..26f304b09f 100644 --- a/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java +++ b/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java @@ -19,8 +19,10 @@ package io.netty.util.concurrent; import org.junit.Test; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import static org.hamcrest.CoreMatchers.*; import static org.junit.Assert.*; @@ -81,26 +83,9 @@ public class DefaultPromiseTest { @Test public void testListenerNotifyOrder() throws Exception { - SingleThreadEventExecutor executor = - new SingleThreadEventExecutor(null, Executors.defaultThreadFactory(), true) { - @Override - protected void run() { - for (;;) { - Runnable task = takeTask(); - if (task != null) { - task.run(); - updateLastExecutionTime(); - } - - if (confirmShutdown()) { - break; - } - } - } - }; - + EventExecutor executor = new TestEventExecutor(); final BlockingQueue> listeners = new LinkedBlockingQueue>(); - int runs = 20000; + int runs = 100000; for (int i = 0; i < runs; i++) { final Promise promise = new DefaultPromise(executor); @@ -148,4 +133,67 @@ public class DefaultPromiseTest { } executor.shutdownGracefully().sync(); } + + @Test + public void testListenerNotifyLater() throws Exception { + // Testing first execution path in DefaultPromise + testListenerNotifyLater(1); + + // Testing second execution path in DefaultPromise + testListenerNotifyLater(2); + } + + private static void testListenerNotifyLater(final int numListenersBefore) throws Exception { + EventExecutor executor = new TestEventExecutor(); + int expectedCount = numListenersBefore + 2; + final CountDownLatch latch = new CountDownLatch(expectedCount); + final FutureListener listener = new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + latch.countDown(); + } + }; + final Promise promise = new DefaultPromise(executor); + executor.execute(new Runnable() { + @Override + public void run() { + for (int i = 0; i < numListenersBefore; i++) { + promise.addListener(listener); + } + promise.setSuccess(null); + + GlobalEventExecutor.INSTANCE.execute(new Runnable() { + @Override + public void run() { + promise.addListener(listener); + } + }); + promise.addListener(listener); + } + }); + + assertTrue("Should have notifed " + expectedCount + " listeners", latch.await(5, TimeUnit.SECONDS)); + executor.shutdownGracefully().sync(); + } + + private static final class TestEventExecutor extends SingleThreadEventExecutor { + TestEventExecutor() { + super(null, Executors.defaultThreadFactory(), true); + } + + @Override + protected void run() { + for (;;) { + Runnable task = takeTask(); + if (task != null) { + task.run(); + updateLastExecutionTime(); + } + + if (confirmShutdown()) { + break; + } + } + } + } }