Ensure ChannelHandler.handlerAdded(...) callback is executed directly when added from ChannelFutureListener added to the registration future.

Motivation:

Commit 4c048d069d moved the logic of calling handlerAdded(...) to the channelRegistered(...) callback of the head of the DefaultChannelPipeline. Unfortunatlly this may execute the callbacks to late as a user may add handlers to the pipeline in the ChannelFutureListener attached to the registration future. This can lead to incorrect ordering.

Modifications:

Ensure we always invoke ChannelHandler.handlerAdded(...) for all handlers before the registration promise is notified.

Result:

Not possible of incorrect ordering or missed events.
This commit is contained in:
Norman Maurer 2016-07-06 12:03:45 +02:00
parent 79c8ec4d33
commit 50a74e95f2
3 changed files with 76 additions and 8 deletions

View File

@ -501,6 +501,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing

View File

@ -66,6 +66,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
private Map<EventExecutorGroup, EventExecutor> childExecutors;
private MessageSizeEstimator.Handle estimatorHandle;
private boolean firstRegistration = true;
/**
* This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process
@ -637,6 +638,16 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
}
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
// We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
// that were added before the registration was done.
callHandlerAddedForAllHandlers();
}
}
@Override
public final ChannelHandler first() {
ChannelHandlerContext first = firstContext();
@ -1218,7 +1229,6 @@ public class DefaultChannelPipeline implements ChannelPipeline {
implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
private boolean firstRegistration = true;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
@ -1293,13 +1303,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
if (firstRegistration) {
firstRegistration = false;
// We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
// that were added before the registration was done.
callHandlerAddedForAllHandlers();
}
invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered();
}

View File

@ -25,6 +25,10 @@ import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.socket.oio.OioSocketChannel;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
@ -887,6 +891,62 @@ public class DefaultChannelPipelineTest {
}
}
@Test(timeout = 3000)
public void testAddInListenerNio() throws Throwable {
testAddInListener(new NioSocketChannel(), new NioEventLoopGroup(1));
}
@Test(timeout = 3000)
public void testAddInListenerOio() throws Throwable {
testAddInListener(new OioSocketChannel(), new OioEventLoopGroup(1));
}
@Test(timeout = 3000)
public void testAddInListenerLocal() throws Throwable {
testAddInListener(new LocalChannel(), new DefaultEventLoopGroup(1));
}
private static void testAddInListener(Channel channel, EventLoopGroup group) throws Throwable {
ChannelPipeline pipeline1 = channel.pipeline();
try {
final Object event = new Object();
final Promise<Object> promise = ImmediateEventExecutor.INSTANCE.newPromise();
group.register(pipeline1.channel()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
ChannelPipeline pipeline = future.channel().pipeline();
final AtomicBoolean handlerAddedCalled = new AtomicBoolean();
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
handlerAddedCalled.set(true);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
promise.setSuccess(event);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
promise.setFailure(cause);
}
});
if (!handlerAddedCalled.get()) {
promise.setFailure(new AssertionError("handlerAdded(...) should have been called"));
return;
}
// This event must be captured by the added handler.
pipeline.fireUserEventTriggered(event);
}
});
assertSame(event, promise.syncUninterruptibly().getNow());
} finally {
pipeline1.channel().close().syncUninterruptibly();
group.shutdownGracefully();
}
}
@Test
public void testNullName() {
ChannelPipeline pipeline = new LocalChannel().pipeline();