Decouple DefaultChannelPipeline from AbstractChannel

Motivation:

DefaultChannelPipeline was tightly coupled to AbstractChannel which is not really needed.

Modifications:

Move logic of calling handlerAdded(...) for handlers that were added before the Channel was registered to DefaultChannelPipeline by making it part of the head context.

Result:

Less coupling and so be able to use DefaultChannelPipeline also with other Channel implementations that not extend AbstractChannel
This commit is contained in:
Norman Maurer 2016-05-20 12:05:32 +02:00
parent 0838f223e1
commit a729e0fcd9
3 changed files with 122 additions and 69 deletions

View File

@ -494,12 +494,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
neverRegistered = false; neverRegistered = false;
registered = true; registered = true;
if (firstRegistration) {
// We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
// that were added before the registration was done.
pipeline.callHandlerAddedForAllHandlers();
}
safeSetSuccess(promise); safeSetSuccess(promise);
pipeline.fireChannelRegistered(); pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing // Only fire a channelActive if the channel has never been registered. This prevents firing

View File

@ -23,6 +23,7 @@ import io.netty.util.AttributeKey;
import io.netty.util.Recycler; import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.RecyclableMpscLinkedQueueNode; import io.netty.util.internal.RecyclableMpscLinkedQueueNode;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
@ -117,7 +118,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
@Override @Override
public ChannelHandlerContext fireChannelRegistered() { public ChannelHandlerContext fireChannelRegistered() {
final AbstractChannelHandlerContext next = findContextInbound(); invokeChannelRegistered(findContextInbound());
return this;
}
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (executor.inEventLoop()) {
next.invokeChannelRegistered(); next.invokeChannelRegistered();
@ -129,7 +134,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
}); });
} }
return this;
} }
private void invokeChannelRegistered() { private void invokeChannelRegistered() {
@ -146,7 +150,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
@Override @Override
public ChannelHandlerContext fireChannelUnregistered() { public ChannelHandlerContext fireChannelUnregistered() {
final AbstractChannelHandlerContext next = findContextInbound(); invokeChannelUnregistered(findContextInbound());
return this;
}
static void invokeChannelUnregistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (executor.inEventLoop()) {
next.invokeChannelUnregistered(); next.invokeChannelUnregistered();
@ -158,7 +166,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
}); });
} }
return this;
} }
private void invokeChannelUnregistered() { private void invokeChannelUnregistered() {
@ -176,6 +183,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
@Override @Override
public ChannelHandlerContext fireChannelActive() { public ChannelHandlerContext fireChannelActive() {
final AbstractChannelHandlerContext next = findContextInbound(); final AbstractChannelHandlerContext next = findContextInbound();
invokeChannelActive(next);
return this;
}
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (executor.inEventLoop()) {
next.invokeChannelActive(); next.invokeChannelActive();
@ -187,7 +199,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
}); });
} }
return this;
} }
private void invokeChannelActive() { private void invokeChannelActive() {
@ -204,7 +215,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
@Override @Override
public ChannelHandlerContext fireChannelInactive() { public ChannelHandlerContext fireChannelInactive() {
final AbstractChannelHandlerContext next = findContextInbound(); invokeChannelInactive(findContextInbound());
return this;
}
static void invokeChannelInactive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (executor.inEventLoop()) {
next.invokeChannelInactive(); next.invokeChannelInactive();
@ -216,7 +231,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
}); });
} }
return this;
} }
private void invokeChannelInactive() { private void invokeChannelInactive() {
@ -233,12 +247,12 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
@Override @Override
public ChannelHandlerContext fireExceptionCaught(final Throwable cause) { public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
if (cause == null) { invokeExceptionCaught(next, cause);
throw new NullPointerException("cause"); return this;
} }
final AbstractChannelHandlerContext next = this.next;
static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
ObjectUtil.checkNotNull(cause, "cause");
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (executor.inEventLoop()) {
next.invokeExceptionCaught(cause); next.invokeExceptionCaught(cause);
@ -257,7 +271,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
} }
} }
return this;
} }
private void invokeExceptionCaught(final Throwable cause) { private void invokeExceptionCaught(final Throwable cause) {
@ -278,11 +291,12 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
@Override @Override
public ChannelHandlerContext fireUserEventTriggered(final Object event) { public ChannelHandlerContext fireUserEventTriggered(final Object event) {
if (event == null) { invokeUserEventTriggered(findContextInbound(), event);
throw new NullPointerException("event"); return this;
} }
final AbstractChannelHandlerContext next = findContextInbound(); static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) {
ObjectUtil.checkNotNull(event, "event");
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (executor.inEventLoop()) {
next.invokeUserEventTriggered(event); next.invokeUserEventTriggered(event);
@ -294,7 +308,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
}); });
} }
return this;
} }
private void invokeUserEventTriggered(Object event) { private void invokeUserEventTriggered(Object event) {
@ -311,12 +324,12 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
@Override @Override
public ChannelHandlerContext fireChannelRead(final Object msg) { public ChannelHandlerContext fireChannelRead(final Object msg) {
if (msg == null) { invokeChannelRead(findContextInbound(), msg);
throw new NullPointerException("msg"); return this;
} }
final AbstractChannelHandlerContext next = findContextInbound(); static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = pipeline.touch(msg, next); final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (executor.inEventLoop()) {
next.invokeChannelRead(m); next.invokeChannelRead(m);
@ -328,7 +341,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
}); });
} }
return this;
} }
private void invokeChannelRead(Object msg) { private void invokeChannelRead(Object msg) {
@ -345,7 +357,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
@Override @Override
public ChannelHandlerContext fireChannelReadComplete() { public ChannelHandlerContext fireChannelReadComplete() {
final AbstractChannelHandlerContext next = findContextInbound(); invokeChannelReadComplete(findContextInbound());
return this;
}
static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (executor.inEventLoop()) {
next.invokeChannelReadComplete(); next.invokeChannelReadComplete();
@ -361,7 +377,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
executor.execute(task); executor.execute(task);
} }
return this;
} }
private void invokeChannelReadComplete() { private void invokeChannelReadComplete() {
@ -378,7 +393,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
@Override @Override
public ChannelHandlerContext fireChannelWritabilityChanged() { public ChannelHandlerContext fireChannelWritabilityChanged() {
final AbstractChannelHandlerContext next = findContextInbound(); invokeChannelWritabilityChanged(findContextInbound());
return this;
}
static void invokeChannelWritabilityChanged(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (executor.inEventLoop()) {
next.invokeChannelWritabilityChanged(); next.invokeChannelWritabilityChanged();
@ -394,7 +413,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
executor.execute(task); executor.execute(task);
} }
return this;
} }
private void invokeChannelWritabilityChanged() { private void invokeChannelWritabilityChanged() {

View File

@ -84,10 +84,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
*/ */
private boolean registered; private boolean registered;
// - protected as this should only be called from within the same package or if someone extends protected DefaultChannelPipeline(Channel channel) {
// DefaultChannelPipeline.
// - Tied to AbstractChannel as we need to ensure that callHandlerAddedForAllHandlers() is correctly called.
protected DefaultChannelPipeline(AbstractChannel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel"); this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null); succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true); voidPromise = new VoidChannelPromise(channel, true);
@ -810,18 +807,13 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public final ChannelPipeline fireChannelRegistered() { public final ChannelPipeline fireChannelRegistered() {
head.fireChannelRegistered(); AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this; return this;
} }
@Override @Override
public final ChannelPipeline fireChannelUnregistered() { public final ChannelPipeline fireChannelUnregistered() {
head.fireChannelUnregistered(); AbstractChannelHandlerContext.invokeChannelUnregistered(head);
// Remove all handlers sequentially if channel is closed and unregistered.
if (!channel.isOpen()) {
destroy();
}
return this; return this;
} }
@ -897,51 +889,43 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public final ChannelPipeline fireChannelActive() { public final ChannelPipeline fireChannelActive() {
head.fireChannelActive(); AbstractChannelHandlerContext.invokeChannelActive(head);
if (channel.config().isAutoRead()) {
channel.read();
}
return this; return this;
} }
@Override @Override
public final ChannelPipeline fireChannelInactive() { public final ChannelPipeline fireChannelInactive() {
head.fireChannelInactive(); AbstractChannelHandlerContext.invokeChannelInactive(head);
return this; return this;
} }
@Override @Override
public final ChannelPipeline fireExceptionCaught(Throwable cause) { public final ChannelPipeline fireExceptionCaught(Throwable cause) {
head.fireExceptionCaught(cause); AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
return this; return this;
} }
@Override @Override
public final ChannelPipeline fireUserEventTriggered(Object event) { public final ChannelPipeline fireUserEventTriggered(Object event) {
head.fireUserEventTriggered(event); AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
return this; return this;
} }
@Override @Override
public final ChannelPipeline fireChannelRead(Object msg) { public final ChannelPipeline fireChannelRead(Object msg) {
head.fireChannelRead(msg); AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this; return this;
} }
@Override @Override
public final ChannelPipeline fireChannelReadComplete() { public final ChannelPipeline fireChannelReadComplete() {
head.fireChannelReadComplete(); AbstractChannelHandlerContext.invokeChannelReadComplete(head);
if (channel.config().isAutoRead()) {
read();
}
return this; return this;
} }
@Override @Override
public final ChannelPipeline fireChannelWritabilityChanged() { public final ChannelPipeline fireChannelWritabilityChanged() {
head.fireChannelWritabilityChanged(); AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
return this; return this;
} }
@ -1107,13 +1091,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
} }
} }
/** private void callHandlerAddedForAllHandlers() {
* Must be called before {@link #fireChannelRegistered()} is called the first time.
*/
final void callHandlerAddedForAllHandlers() {
// This should only called from within the EventLoop.
assert channel.eventLoop().inEventLoop();
final PendingHandlerCallback pendingHandlerCallbackHead; final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) { synchronized (this) {
assert !registered; assert !registered;
@ -1247,10 +1225,11 @@ public class DefaultChannelPipeline implements ChannelPipeline {
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }
} }
static final class HeadContext extends AbstractChannelHandlerContext final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler { implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe; private final Unsafe unsafe;
private boolean firstRegistration = true;
HeadContext(DefaultChannelPipeline pipeline) { HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true); super(pipeline, null, HEAD_NAME, false, true);
@ -1322,6 +1301,68 @@ public class DefaultChannelPipeline implements ChannelPipeline {
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause); ctx.fireExceptionCaught(cause);
} }
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
if (firstRegistration) {
firstRegistration = false;
// We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
// that were added before the registration was done.
callHandlerAddedForAllHandlers();
}
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
// Remove all handlers sequentially if channel is closed and unregistered.
if (!channel.isOpen()) {
destroy();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
} }
private abstract static class PendingHandlerCallback extends OneTimeTask { private abstract static class PendingHandlerCallback extends OneTimeTask {