Ensure connectPromise is not notified before fireChannelActive() is called.

Motivation:

Our contract in Channels is that the promise should always be notified before the actual callbacks of the ChannelInboundHandler are called. This was not done in the LocalChannel and so the behavior was different to other Channel implementations.

Modifications:

- First complete the ChannelPromise then call fireChannelActive()
- Guard against NPE when doClose() was called before the task was executed.

Result:

Consistent behavior between LocalChannel and other Channel implementations.
This commit is contained in:
Norman Maurer 2016-01-14 12:15:01 +01:00
parent b9ae48589b
commit 1848e73ce6
2 changed files with 59 additions and 2 deletions

View File

@ -196,8 +196,13 @@ public class LocalChannel extends AbstractChannel {
@Override
public void run() {
registerInProgress = false;
peer.pipeline().fireChannelActive();
peer.connectPromise.setSuccess();
ChannelPromise promise = peer.connectPromise;
// Only trigger fireChannelActive() if the promise was not null and was not completed yet.
// connectPromise may be set to null if doClose() was called in the meantime.
if (promise != null && promise.trySuccess()) {
peer.pipeline().fireChannelActive();
}
}
});
}

View File

@ -34,6 +34,7 @@ import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -811,6 +812,57 @@ public class LocalChannelTest {
}
}
@Test(timeout = 3000)
public void testConnectFutureBeforeChannelActive() throws Exception {
Bootstrap cb = new Bootstrap();
ServerBootstrap sb = new ServerBootstrap();
cb.group(group1)
.channel(LocalChannel.class)
.handler(new ChannelInboundHandlerAdapter());
sb.group(group2)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new TestHandler());
}
});
Channel sc = null;
Channel cc = null;
try {
// Start server
sc = sb.bind(TEST_ADDRESS).sync().channel();
cc = cb.register().sync().channel();
final ChannelPromise promise = cc.newPromise();
final Promise<Void> assertPromise = cc.eventLoop().newPromise();
cc.pipeline().addLast(new TestHandler() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// Ensure the promise was done before the handler method is triggered.
if (promise.isDone()) {
assertPromise.setSuccess(null);
} else {
assertPromise.setFailure(new AssertionError("connect promise should be done"));
}
}
});
// Connect to the server
cc.connect(sc.localAddress(), promise).sync();
assertPromise.syncUninterruptibly();
assertTrue(promise.isSuccess());
} finally {
closeChannel(cc);
closeChannel(sc);
}
}
private static final class LatchChannelFutureListener extends CountDownLatch implements ChannelFutureListener {
public LatchChannelFutureListener(int count) {
super(count);