diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index e6e3595da0..4055677dba 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -501,6 +501,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha neverRegistered = false; registered = true; + // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the + // user may already fire events through the pipeline in the ChannelFutureListener. + pipeline.invokeHandlerAddedIfNeeded(); + safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 49c3077e3b..920790e365 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -66,6 +66,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { private Map childExecutors; private MessageSizeEstimator.Handle estimatorHandle; + private boolean firstRegistration = true; /** * This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process @@ -637,6 +638,16 @@ public class DefaultChannelPipeline implements ChannelPipeline { } } + final void invokeHandlerAddedIfNeeded() { + assert channel.eventLoop().inEventLoop(); + if (firstRegistration) { + firstRegistration = false; + // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers, + // that were added before the registration was done. + callHandlerAddedForAllHandlers(); + } + } + @Override public final ChannelHandler first() { ChannelHandlerContext first = firstContext(); @@ -1218,7 +1229,6 @@ public class DefaultChannelPipeline implements ChannelPipeline { implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; - private boolean firstRegistration = true; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); @@ -1293,13 +1303,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { - if (firstRegistration) { - firstRegistration = false; - // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers, - // that were added before the registration was done. - callHandlerAddedForAllHandlers(); - } - + invokeHandlerAddedIfNeeded(); ctx.fireChannelRegistered(); } diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index 1a4faa4401..83eefbc783 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -25,6 +25,10 @@ import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; import io.netty.channel.local.LocalServerChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.oio.OioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.channel.socket.oio.OioSocketChannel; import io.netty.util.AbstractReferenceCounted; import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; @@ -887,6 +891,62 @@ public class DefaultChannelPipelineTest { } } + @Test(timeout = 3000) + public void testAddInListenerNio() throws Throwable { + testAddInListener(new NioSocketChannel(), new NioEventLoopGroup(1)); + } + + @Test(timeout = 3000) + public void testAddInListenerOio() throws Throwable { + testAddInListener(new OioSocketChannel(), new OioEventLoopGroup(1)); + } + + @Test(timeout = 3000) + public void testAddInListenerLocal() throws Throwable { + testAddInListener(new LocalChannel(), new DefaultEventLoopGroup(1)); + } + + private static void testAddInListener(Channel channel, EventLoopGroup group) throws Throwable { + ChannelPipeline pipeline1 = channel.pipeline(); + try { + final Object event = new Object(); + final Promise promise = ImmediateEventExecutor.INSTANCE.newPromise(); + group.register(pipeline1.channel()).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + ChannelPipeline pipeline = future.channel().pipeline(); + final AtomicBoolean handlerAddedCalled = new AtomicBoolean(); + pipeline.addLast(new ChannelInboundHandlerAdapter() { + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + handlerAddedCalled.set(true); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + promise.setSuccess(event); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + promise.setFailure(cause); + } + }); + if (!handlerAddedCalled.get()) { + promise.setFailure(new AssertionError("handlerAdded(...) should have been called")); + return; + } + // This event must be captured by the added handler. + pipeline.fireUserEventTriggered(event); + } + }); + assertSame(event, promise.syncUninterruptibly().getNow()); + } finally { + pipeline1.channel().close().syncUninterruptibly(); + group.shutdownGracefully(); + } + } + @Test public void testNullName() { ChannelPipeline pipeline = new LocalChannel().pipeline();