diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 06729bc052..e9d92a1397 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -611,8 +611,10 @@ public class DefaultChannelPipeline implements ChannelPipeline { private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { try { - ctx.handler().handlerAdded(ctx); + // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates + // any pipeline events ctx.handler() will miss them because the state will not allow it. ctx.setAddComplete(); + ctx.handler().handlerAdded(ctx); } catch (Throwable t) { boolean removed = false; try { diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index bd0727296f..6071649b34 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -59,7 +59,14 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class DefaultChannelPipelineTest { @@ -1102,6 +1109,63 @@ public class DefaultChannelPipelineTest { } } + @Test(timeout = 5000) + public void handlerAddedStateUpdatedBeforeHandlerAddedDoneForceEventLoop() throws InterruptedException { + handlerAddedStateUpdatedBeforeHandlerAddedDone(true); + } + + @Test(timeout = 5000) + public void handlerAddedStateUpdatedBeforeHandlerAddedDoneOnCallingThread() throws InterruptedException { + handlerAddedStateUpdatedBeforeHandlerAddedDone(false); + } + + private static void handlerAddedStateUpdatedBeforeHandlerAddedDone(boolean executeInEventLoop) + throws InterruptedException { + final ChannelPipeline pipeline = new LocalChannel().pipeline(); + final Object userEvent = new Object(); + final Object writeObject = new Object(); + final CountDownLatch doneLatch = new CountDownLatch(1); + + group.register(pipeline.channel()); + + Runnable r = new Runnable() { + @Override + public void run() { + pipeline.addLast(new ChannelInboundHandlerAdapter() { + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { + if (evt == userEvent) { + ctx.write(writeObject); + } + ctx.fireUserEventTriggered(evt); + } + }); + pipeline.addFirst(new ChannelDuplexHandler() { + @Override + public void handlerAdded(ChannelHandlerContext ctx) { + ctx.fireUserEventTriggered(userEvent); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + if (msg == writeObject) { + doneLatch.countDown(); + } + ctx.write(msg, promise); + } + }); + } + }; + + if (executeInEventLoop) { + pipeline.channel().eventLoop().execute(r); + } else { + r.run(); + } + + doneLatch.await(); + } + private static final class TestTask implements Runnable { private final ChannelPipeline pipeline;