From 2d815fa75210dfeb578f380be6b71cdddf39d933 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Mon, 22 Jan 2018 11:09:17 -0800 Subject: [PATCH] DefaultChannelPipeline will not invoke handler if events are fired from handlerAdded Motiviation: DefaultChannelPipeline and AbstractChannelHandlerContext maintain state which indicates if a ChannelHandler should be invoked or not. However the state is updated to allow the handler to be invoked only after the handlerAdded method completes. If the handlerAdded method generates events which may result in other methods being invoked on that handler they will be missed. Modifications: - DefaultChannelPipeline should set the state before calling handlerAdded Result: DefaultChannelPipeline will allow events to be processed during the handlerAdded process. --- .../netty/channel/DefaultChannelPipeline.java | 4 +- .../channel/DefaultChannelPipelineTest.java | 66 ++++++++++++++++++- 2 files changed, 68 insertions(+), 2 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 06729bc052..e9d92a1397 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -611,8 +611,10 @@ public class DefaultChannelPipeline implements ChannelPipeline { private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { try { - ctx.handler().handlerAdded(ctx); + // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates + // any pipeline events ctx.handler() will miss them because the state will not allow it. ctx.setAddComplete(); + ctx.handler().handlerAdded(ctx); } catch (Throwable t) { boolean removed = false; try { diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index bd0727296f..6071649b34 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -59,7 +59,14 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class DefaultChannelPipelineTest { @@ -1102,6 +1109,63 @@ public class DefaultChannelPipelineTest { } } + @Test(timeout = 5000) + public void handlerAddedStateUpdatedBeforeHandlerAddedDoneForceEventLoop() throws InterruptedException { + handlerAddedStateUpdatedBeforeHandlerAddedDone(true); + } + + @Test(timeout = 5000) + public void handlerAddedStateUpdatedBeforeHandlerAddedDoneOnCallingThread() throws InterruptedException { + handlerAddedStateUpdatedBeforeHandlerAddedDone(false); + } + + private static void handlerAddedStateUpdatedBeforeHandlerAddedDone(boolean executeInEventLoop) + throws InterruptedException { + final ChannelPipeline pipeline = new LocalChannel().pipeline(); + final Object userEvent = new Object(); + final Object writeObject = new Object(); + final CountDownLatch doneLatch = new CountDownLatch(1); + + group.register(pipeline.channel()); + + Runnable r = new Runnable() { + @Override + public void run() { + pipeline.addLast(new ChannelInboundHandlerAdapter() { + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { + if (evt == userEvent) { + ctx.write(writeObject); + } + ctx.fireUserEventTriggered(evt); + } + }); + pipeline.addFirst(new ChannelDuplexHandler() { + @Override + public void handlerAdded(ChannelHandlerContext ctx) { + ctx.fireUserEventTriggered(userEvent); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + if (msg == writeObject) { + doneLatch.countDown(); + } + ctx.write(msg, promise); + } + }); + } + }; + + if (executeInEventLoop) { + pipeline.channel().eventLoop().execute(r); + } else { + r.run(); + } + + doneLatch.await(); + } + private static final class TestTask implements Runnable { private final ChannelPipeline pipeline;