Ensure ChannelHandler.handlerAdded(...) callback is executed directly when added from ChannelFutureListener added to the registration future.
Motivation: Commit 4c048d069d99891d6a83859b469c39b4ff0f4ae1 moved the logic of calling handlerAdded(...) to the channelRegistered(...) callback of the head of the DefaultChannelPipeline. Unfortunatlly this may execute the callbacks to late as a user may add handlers to the pipeline in the ChannelFutureListener attached to the registration future. This can lead to incorrect ordering. Modifications: Ensure we always invoke ChannelHandler.handlerAdded(...) for all handlers before the registration promise is notified. Result: Not possible of incorrect ordering or missed events.
This commit is contained in:
parent
833772ca46
commit
f76e50e530
@ -448,6 +448,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
neverRegistered = false;
|
||||
registered = true;
|
||||
|
||||
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
|
||||
// user may already fire events through the pipeline in the ChannelFutureListener.
|
||||
pipeline.invokeHandlerAddedIfNeeded();
|
||||
|
||||
safeSetSuccess(promise);
|
||||
pipeline.fireChannelRegistered();
|
||||
// Only fire a channelActive if the channel has never been registered. This prevents firing
|
||||
|
@ -61,6 +61,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
private final Channel channel;
|
||||
private Map<EventExecutorGroup, EventExecutor> childExecutors;
|
||||
private MessageSizeEstimator.Handle estimatorHandle;
|
||||
private boolean firstRegistration = true;
|
||||
|
||||
/**
|
||||
* This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process
|
||||
@ -622,6 +623,16 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
}
|
||||
}
|
||||
|
||||
final void invokeHandlerAddedIfNeeded() {
|
||||
assert channel.eventLoop().inEventLoop();
|
||||
if (firstRegistration) {
|
||||
firstRegistration = false;
|
||||
// We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
|
||||
// that were added before the registration was done.
|
||||
callHandlerAddedForAllHandlers();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final ChannelHandler first() {
|
||||
ChannelHandlerContext first = firstContext();
|
||||
@ -1178,7 +1189,6 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
implements ChannelOutboundHandler, ChannelInboundHandler {
|
||||
|
||||
private final Unsafe unsafe;
|
||||
private boolean firstRegistration = true;
|
||||
|
||||
HeadContext(DefaultChannelPipeline pipeline) {
|
||||
super(pipeline, null, HEAD_NAME, false, true);
|
||||
@ -1253,13 +1263,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
|
||||
@Override
|
||||
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
|
||||
if (firstRegistration) {
|
||||
firstRegistration = false;
|
||||
// We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
|
||||
// that were added before the registration was done.
|
||||
callHandlerAddedForAllHandlers();
|
||||
}
|
||||
|
||||
invokeHandlerAddedIfNeeded();
|
||||
ctx.fireChannelRegistered();
|
||||
}
|
||||
|
||||
|
@ -26,6 +26,10 @@ import io.netty.channel.local.LocalAddress;
|
||||
import io.netty.channel.local.LocalChannel;
|
||||
import io.netty.channel.local.LocalEventLoopGroup;
|
||||
import io.netty.channel.local.LocalServerChannel;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.oio.OioEventLoopGroup;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import io.netty.channel.socket.oio.OioSocketChannel;
|
||||
import io.netty.util.AbstractReferenceCounted;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.ReferenceCounted;
|
||||
@ -883,6 +887,62 @@ public class DefaultChannelPipelineTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 3000)
|
||||
public void testAddInListenerNio() throws Throwable {
|
||||
testAddInListener(new NioSocketChannel(), new NioEventLoopGroup(1));
|
||||
}
|
||||
|
||||
@Test(timeout = 3000)
|
||||
public void testAddInListenerOio() throws Throwable {
|
||||
testAddInListener(new OioSocketChannel(), new OioEventLoopGroup(1));
|
||||
}
|
||||
|
||||
@Test(timeout = 3000)
|
||||
public void testAddInListenerLocal() throws Throwable {
|
||||
testAddInListener(new LocalChannel(), new LocalEventLoopGroup(1));
|
||||
}
|
||||
|
||||
private static void testAddInListener(Channel channel, EventLoopGroup group) throws Throwable {
|
||||
ChannelPipeline pipeline1 = channel.pipeline();
|
||||
try {
|
||||
final Object event = new Object();
|
||||
final Promise<Object> promise = ImmediateEventExecutor.INSTANCE.newPromise();
|
||||
group.register(pipeline1.channel()).addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
ChannelPipeline pipeline = future.channel().pipeline();
|
||||
final AtomicBoolean handlerAddedCalled = new AtomicBoolean();
|
||||
pipeline.addLast(new ChannelInboundHandlerAdapter() {
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||
handlerAddedCalled.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
||||
promise.setSuccess(event);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
promise.setFailure(cause);
|
||||
}
|
||||
});
|
||||
if (!handlerAddedCalled.get()) {
|
||||
promise.setFailure(new AssertionError("handlerAdded(...) should have been called"));
|
||||
return;
|
||||
}
|
||||
// This event must be captured by the added handler.
|
||||
pipeline.fireUserEventTriggered(event);
|
||||
}
|
||||
});
|
||||
assertSame(event, promise.syncUninterruptibly().getNow());
|
||||
} finally {
|
||||
pipeline1.channel().close().syncUninterruptibly();
|
||||
group.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullName() {
|
||||
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||
|
Loading…
x
Reference in New Issue
Block a user