Ensure connectPromise is not notified before fireChannelActive() is called.
Motivation: Our contract in Channels is that the promise should always be notified before the actual callbacks of the ChannelInboundHandler are called. This was not done in the LocalChannel and so the behavior was different to other Channel implementations. Modifications: - First complete the ChannelPromise then call fireChannelActive() - Guard against NPE when doClose() was called before the task was executed. Result: Consistent behavior between LocalChannel and other Channel implementations.
This commit is contained in:
parent
53323530c1
commit
8bfbb35979
@ -194,8 +194,13 @@ public class LocalChannel extends AbstractChannel {
|
||||
@Override
|
||||
public void run() {
|
||||
registerInProgress = false;
|
||||
peer.pipeline().fireChannelActive();
|
||||
peer.connectPromise.setSuccess();
|
||||
ChannelPromise promise = peer.connectPromise;
|
||||
|
||||
// Only trigger fireChannelActive() if the promise was not null and was not completed yet.
|
||||
// connectPromise may be set to null if doClose() was called in the meantime.
|
||||
if (promise != null && promise.trySuccess()) {
|
||||
peer.pipeline().fireChannelActive();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -33,6 +33,7 @@ import io.netty.channel.SingleThreadEventLoop;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.concurrent.EventExecutor;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
import io.netty.util.internal.OneTimeTask;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
@ -810,6 +811,57 @@ public class LocalChannelTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 3000)
|
||||
public void testConnectFutureBeforeChannelActive() throws Exception {
|
||||
Bootstrap cb = new Bootstrap();
|
||||
ServerBootstrap sb = new ServerBootstrap();
|
||||
|
||||
cb.group(group1)
|
||||
.channel(LocalChannel.class)
|
||||
.handler(new ChannelInboundHandlerAdapter());
|
||||
|
||||
sb.group(group2)
|
||||
.channel(LocalServerChannel.class)
|
||||
.childHandler(new ChannelInitializer<LocalChannel>() {
|
||||
@Override
|
||||
public void initChannel(LocalChannel ch) throws Exception {
|
||||
ch.pipeline().addLast(new TestHandler());
|
||||
}
|
||||
});
|
||||
|
||||
Channel sc = null;
|
||||
Channel cc = null;
|
||||
try {
|
||||
// Start server
|
||||
sc = sb.bind(TEST_ADDRESS).sync().channel();
|
||||
|
||||
cc = cb.register().sync().channel();
|
||||
|
||||
final ChannelPromise promise = cc.newPromise();
|
||||
final Promise<Void> assertPromise = cc.eventLoop().newPromise();
|
||||
|
||||
cc.pipeline().addLast(new TestHandler() {
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
// Ensure the promise was done before the handler method is triggered.
|
||||
if (promise.isDone()) {
|
||||
assertPromise.setSuccess(null);
|
||||
} else {
|
||||
assertPromise.setFailure(new AssertionError("connect promise should be done"));
|
||||
}
|
||||
}
|
||||
});
|
||||
// Connect to the server
|
||||
cc.connect(sc.localAddress(), promise).sync();
|
||||
|
||||
assertPromise.syncUninterruptibly();
|
||||
assertTrue(promise.isSuccess());
|
||||
} finally {
|
||||
closeChannel(cc);
|
||||
closeChannel(sc);
|
||||
}
|
||||
}
|
||||
|
||||
private static final class LatchChannelFutureListener extends CountDownLatch implements ChannelFutureListener {
|
||||
public LatchChannelFutureListener(int count) {
|
||||
super(count);
|
||||
|
Loading…
x
Reference in New Issue
Block a user