From 1848e73ce6ce563af315b6a51b60e2cd1a164fbe Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 14 Jan 2016 12:15:01 +0100 Subject: [PATCH] Ensure connectPromise is not notified before fireChannelActive() is called. Motivation: Our contract in Channels is that the promise should always be notified before the actual callbacks of the ChannelInboundHandler are called. This was not done in the LocalChannel and so the behavior was different to other Channel implementations. Modifications: - First complete the ChannelPromise then call fireChannelActive() - Guard against NPE when doClose() was called before the task was executed. Result: Consistent behavior between LocalChannel and other Channel implementations. --- .../io/netty/channel/local/LocalChannel.java | 9 +++- .../netty/channel/local/LocalChannelTest.java | 52 +++++++++++++++++++ 2 files changed, 59 insertions(+), 2 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index e9febeac70..b62191edd5 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -196,8 +196,13 @@ public class LocalChannel extends AbstractChannel { @Override public void run() { registerInProgress = false; - peer.pipeline().fireChannelActive(); - peer.connectPromise.setSuccess(); + ChannelPromise promise = peer.connectPromise; + + // Only trigger fireChannelActive() if the promise was not null and was not completed yet. + // connectPromise may be set to null if doClose() was called in the meantime. + if (promise != null && promise.trySuccess()) { + peer.pipeline().fireChannelActive(); + } } }); } diff --git a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java index bac0872005..72985d5731 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java @@ -34,6 +34,7 @@ import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SingleThreadEventLoop; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -811,6 +812,57 @@ public class LocalChannelTest { } } + @Test(timeout = 3000) + public void testConnectFutureBeforeChannelActive() throws Exception { + Bootstrap cb = new Bootstrap(); + ServerBootstrap sb = new ServerBootstrap(); + + cb.group(group1) + .channel(LocalChannel.class) + .handler(new ChannelInboundHandlerAdapter()); + + sb.group(group2) + .channel(LocalServerChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(LocalChannel ch) throws Exception { + ch.pipeline().addLast(new TestHandler()); + } + }); + + Channel sc = null; + Channel cc = null; + try { + // Start server + sc = sb.bind(TEST_ADDRESS).sync().channel(); + + cc = cb.register().sync().channel(); + + final ChannelPromise promise = cc.newPromise(); + final Promise assertPromise = cc.eventLoop().newPromise(); + + cc.pipeline().addLast(new TestHandler() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + // Ensure the promise was done before the handler method is triggered. + if (promise.isDone()) { + assertPromise.setSuccess(null); + } else { + assertPromise.setFailure(new AssertionError("connect promise should be done")); + } + } + }); + // Connect to the server + cc.connect(sc.localAddress(), promise).sync(); + + assertPromise.syncUninterruptibly(); + assertTrue(promise.isSuccess()); + } finally { + closeChannel(cc); + closeChannel(sc); + } + } + private static final class LatchChannelFutureListener extends CountDownLatch implements ChannelFutureListener { public LatchChannelFutureListener(int count) { super(count);