[#5639] Ensure fireChannelActive() is also called if Channel is closed in connect promise.

Motivation:

We need to ensure we also call fireChannelActive() if the Channel is directly closed in a ChannelFutureListener that is belongs to the promise for the connect. Otherwise we will see missing active events.

Modifications:

Ensure we always call fireChannelActive() if the Channel was active.

Result:

No missing events.
This commit is contained in:
Norman Maurer 2016-08-15 14:26:19 +02:00
parent 2c1f17faa2
commit 5e148d5670
4 changed files with 59 additions and 3 deletions

View File

@ -18,6 +18,7 @@ package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.ImmediateEventExecutor;
@ -25,6 +26,8 @@ import io.netty.util.concurrent.Promise;
import org.junit.Test;
import java.net.InetSocketAddress;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import static org.junit.Assert.*;
@ -67,6 +70,46 @@ public class SocketConnectTest extends AbstractSocketTest {
}
}
@Test(timeout = 3000)
public void testChannelEventsFiredWhenClosedDirectly() throws Throwable {
run();
}
public void testChannelEventsFiredWhenClosedDirectly(ServerBootstrap sb, Bootstrap cb) throws Throwable {
final BlockingQueue<Integer> events = new LinkedBlockingQueue<Integer>();
Channel sc = null;
Channel cc = null;
try {
sb.childHandler(new ChannelInboundHandlerAdapter());
sc = sb.bind(0).syncUninterruptibly().channel();
cb.handler(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
events.add(0);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
events.add(1);
}
});
// Connect and directly close again.
cc = cb.connect(sc.localAddress()).addListener(ChannelFutureListener.CLOSE).
syncUninterruptibly().channel();
assertEquals(0, events.take().intValue());
assertEquals(1, events.take().intValue());
} finally {
if (cc != null) {
cc.close();
}
if (sc != null) {
sc.close();
}
}
}
private static void assertLocalAddress(InetSocketAddress address) {
assertTrue(address.getPort() > 0);
assertFalse(address.getAddress().isAnyLocalAddress());

View File

@ -834,12 +834,16 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
}
active = true;
// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
// We still need to ensure we call fireChannelActive() in this case.
boolean active = isActive();
// trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess();
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && isActive()) {
if (!wasActive && active) {
pipeline().fireChannelActive();
}

View File

@ -296,12 +296,16 @@ public abstract class AbstractNioChannel extends AbstractChannel {
return;
}
// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
// We still need to ensure we call fireChannelActive() in this case.
boolean active = isActive();
// trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess();
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && isActive()) {
if (!wasActive && active) {
pipeline().fireChannelActive();
}

View File

@ -68,8 +68,13 @@ public abstract class AbstractOioChannel extends AbstractChannel {
try {
boolean wasActive = isActive();
doConnect(remoteAddress, localAddress);
// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
// We still need to ensure we call fireChannelActive() in this case.
boolean active = isActive();
safeSetSuccess(promise);
if (!wasActive && isActive()) {
if (!wasActive && active) {
pipeline().fireChannelActive();
}
} catch (Throwable t) {