From 4c048d069d99891d6a83859b469c39b4ff0f4ae1 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 20 May 2016 12:05:32 +0200 Subject: [PATCH] Decouple DefaultChannelPipeline from AbstractChannel Motivation: DefaultChannelPipeline was tightly coupled to AbstractChannel which is not really needed. Modifications: Move logic of calling handlerAdded(...) for handlers that were added before the Channel was registered to DefaultChannelPipeline by making it part of the head context. Result: Less coupling and so be able to use DefaultChannelPipeline also with other Channel implementations that not extend AbstractChannel --- .../io/netty/channel/AbstractChannel.java | 6 - .../AbstractChannelHandlerContext.java | 74 +++++++----- .../netty/channel/DefaultChannelPipeline.java | 111 ++++++++++++------ 3 files changed, 122 insertions(+), 69 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 11544640e9..0379092c70 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -441,12 +441,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha neverRegistered = false; registered = true; - if (firstRegistration) { - // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers, - // that were added before the registration was done. - pipeline.callHandlerAddedForAllHandlers(); - } - safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java index d2b27ab9bc..273dd238fd 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java @@ -20,6 +20,7 @@ import io.netty.util.DefaultAttributeMap; import io.netty.util.Recycler; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.RecyclableMpscLinkedQueueNode; import io.netty.util.internal.StringUtil; @@ -112,7 +113,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme @Override public ChannelHandlerContext fireChannelRegistered() { - final AbstractChannelHandlerContext next = findContextInbound(); + invokeChannelRegistered(findContextInbound()); + return this; + } + + static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); @@ -124,7 +129,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } }); } - return this; } private void invokeChannelRegistered() { @@ -141,7 +145,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme @Override public ChannelHandlerContext fireChannelUnregistered() { - final AbstractChannelHandlerContext next = findContextInbound(); + invokeChannelUnregistered(findContextInbound()); + return this; + } + + static void invokeChannelUnregistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelUnregistered(); @@ -153,7 +161,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } }); } - return this; } private void invokeChannelUnregistered() { @@ -171,6 +178,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme @Override public ChannelHandlerContext fireChannelActive() { final AbstractChannelHandlerContext next = findContextInbound(); + invokeChannelActive(next); + return this; + } + + static void invokeChannelActive(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelActive(); @@ -182,7 +194,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } }); } - return this; } private void invokeChannelActive() { @@ -199,7 +210,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme @Override public ChannelHandlerContext fireChannelInactive() { - final AbstractChannelHandlerContext next = findContextInbound(); + invokeChannelInactive(findContextInbound()); + return this; + } + + static void invokeChannelInactive(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelInactive(); @@ -211,7 +226,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } }); } - return this; } private void invokeChannelInactive() { @@ -228,12 +242,12 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme @Override public ChannelHandlerContext fireExceptionCaught(final Throwable cause) { - if (cause == null) { - throw new NullPointerException("cause"); - } - - final AbstractChannelHandlerContext next = this.next; + invokeExceptionCaught(next, cause); + return this; + } + static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) { + ObjectUtil.checkNotNull(cause, "cause"); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeExceptionCaught(cause); @@ -252,7 +266,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } } } - return this; } private void invokeExceptionCaught(final Throwable cause) { @@ -273,11 +286,12 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme @Override public ChannelHandlerContext fireUserEventTriggered(final Object event) { - if (event == null) { - throw new NullPointerException("event"); - } + invokeUserEventTriggered(findContextInbound(), event); + return this; + } - final AbstractChannelHandlerContext next = findContextInbound(); + static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) { + ObjectUtil.checkNotNull(event, "event"); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeUserEventTriggered(event); @@ -289,7 +303,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } }); } - return this; } private void invokeUserEventTriggered(Object event) { @@ -306,11 +319,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme @Override public ChannelHandlerContext fireChannelRead(final Object msg) { - if (msg == null) { - throw new NullPointerException("msg"); - } - - final AbstractChannelHandlerContext next = findContextInbound(); + invokeChannelRead(findContextInbound(), msg); + return this; + } + static void invokeChannelRead(final AbstractChannelHandlerContext next, final Object msg) { + ObjectUtil.checkNotNull(msg, "msg"); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(msg); @@ -322,7 +335,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } }); } - return this; } private void invokeChannelRead(Object msg) { @@ -339,7 +351,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme @Override public ChannelHandlerContext fireChannelReadComplete() { - final AbstractChannelHandlerContext next = findContextInbound(); + invokeChannelReadComplete(findContextInbound()); + return this; + } + + static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelReadComplete(); @@ -355,7 +371,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } executor.execute(task); } - return this; } private void invokeChannelReadComplete() { @@ -372,7 +387,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme @Override public ChannelHandlerContext fireChannelWritabilityChanged() { - final AbstractChannelHandlerContext next = findContextInbound(); + invokeChannelWritabilityChanged(findContextInbound()); + return this; + } + + static void invokeChannelWritabilityChanged(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelWritabilityChanged(); @@ -388,7 +407,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } executor.execute(task); } - return this; } private void invokeChannelWritabilityChanged() { diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 27b5b42c51..d9c12896bd 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -79,10 +79,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { */ private boolean registered; - // - protected as this should only be called from within the same package or if someone extends - // DefaultChannelPipeline. - // - Tied to AbstractChannel as we need to ensure that callHandlerAddedForAllHandlers() is correctly called. - protected DefaultChannelPipeline(AbstractChannel channel) { + protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); tail = new TailContext(this); @@ -783,18 +780,13 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public final ChannelPipeline fireChannelRegistered() { - head.fireChannelRegistered(); + AbstractChannelHandlerContext.invokeChannelRegistered(head); return this; } @Override public final ChannelPipeline fireChannelUnregistered() { - head.fireChannelUnregistered(); - - // Remove all handlers sequentially if channel is closed and unregistered. - if (!channel.isOpen()) { - destroy(); - } + AbstractChannelHandlerContext.invokeChannelUnregistered(head); return this; } @@ -870,51 +862,43 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public final ChannelPipeline fireChannelActive() { - head.fireChannelActive(); - - if (channel.config().isAutoRead()) { - channel.read(); - } - + AbstractChannelHandlerContext.invokeChannelActive(head); return this; } @Override public final ChannelPipeline fireChannelInactive() { - head.fireChannelInactive(); + AbstractChannelHandlerContext.invokeChannelInactive(head); return this; } @Override public final ChannelPipeline fireExceptionCaught(Throwable cause) { - head.fireExceptionCaught(cause); + AbstractChannelHandlerContext.invokeExceptionCaught(head, cause); return this; } @Override public final ChannelPipeline fireUserEventTriggered(Object event) { - head.fireUserEventTriggered(event); + AbstractChannelHandlerContext.invokeUserEventTriggered(head, event); return this; } @Override public final ChannelPipeline fireChannelRead(Object msg) { - head.fireChannelRead(msg); + AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; } @Override public final ChannelPipeline fireChannelReadComplete() { - head.fireChannelReadComplete(); - if (channel.config().isAutoRead()) { - read(); - } + AbstractChannelHandlerContext.invokeChannelReadComplete(head); return this; } @Override public final ChannelPipeline fireChannelWritabilityChanged() { - head.fireChannelWritabilityChanged(); + AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head); return this; } @@ -1055,13 +1039,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { } } - /** - * Must be called before {@link #fireChannelRegistered()} is called the first time. - */ - final void callHandlerAddedForAllHandlers() { - // This should only called from within the EventLoop. - assert channel.eventLoop().inEventLoop(); - + private void callHandlerAddedForAllHandlers() { final PendingHandlerCallback pendingHandlerCallbackHead; synchronized (this) { assert !registered; @@ -1195,10 +1173,11 @@ public class DefaultChannelPipeline implements ChannelPipeline { public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } } - static final class HeadContext extends AbstractChannelHandlerContext - implements ChannelOutboundHandler { + final class HeadContext extends AbstractChannelHandlerContext + implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; + private boolean firstRegistration = true; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); @@ -1270,6 +1249,68 @@ public class DefaultChannelPipeline implements ChannelPipeline { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + if (firstRegistration) { + firstRegistration = false; + // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers, + // that were added before the registration was done. + callHandlerAddedForAllHandlers(); + } + + ctx.fireChannelRegistered(); + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + ctx.fireChannelUnregistered(); + + // Remove all handlers sequentially if channel is closed and unregistered. + if (!channel.isOpen()) { + destroy(); + } + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + ctx.fireChannelActive(); + + readIfIsAutoRead(); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + ctx.fireChannelInactive(); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ctx.fireChannelRead(msg); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.fireChannelReadComplete(); + + readIfIsAutoRead(); + } + + private void readIfIsAutoRead() { + if (channel.config().isAutoRead()) { + channel.read(); + } + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + ctx.fireUserEventTriggered(evt); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + ctx.fireChannelWritabilityChanged(); + } } private abstract static class PendingHandlerCallback extends OneTimeTask {