Optimize DefaultChannelPipeline in terms of memory usage and initialization time

Motivation:
Each of DefaultChannelPipeline instance creates an head and tail that wraps a handler. These are used to chain together other DefaultChannelHandlerContext that are created once a new ChannelHandler is added. There are a few things here that can be improved in terms of memory usage and initialization time.

Modification:
- Only generate the name for the tail and head one time as it will never change anyway
- Rename DefaultChannelHandlerContext to AbstractChannelHandlerContext and make it abstract
- Create a new DefaultChannelHandlerContext that is used when a ChannelHandler is added to the DefaultChannelPipeline
- Rename TailHandler to TailContext and HeadHandler to HeadContext and let them extend AbstractChannelHandlerContext. This way we can save 2 object creations per DefaultChannelPipeline

 Result:
- Less memory usage because we have 2 less objects per DefaultChannelPipeline
- Faster creation of DefaultChannelPipeline as we not need to generate the name for the head and tail
This commit is contained in:
Norman Maurer 2014-06-10 12:16:50 +02:00
parent e3c76ec106
commit 8180f7922f
6 changed files with 493 additions and 427 deletions

View File

@ -0,0 +1,379 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ResourceLeakHint;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.StringUtil;
import java.net.SocketAddress;
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
private final boolean inbound;
private final boolean outbound;
private final AbstractChannel channel;
private final DefaultChannelPipeline pipeline;
private final String name;
private boolean removed;
final ChannelHandlerInvoker invoker;
private ChannelFuture succeededFuture;
// Lazily instantiated tasks used to trigger events to a handler with different executor.
// These needs to be volatile as otherwise an other Thread may see an half initialized instance.
// See the JMM for more details
volatile Runnable invokeChannelReadCompleteTask;
volatile Runnable invokeReadTask;
volatile Runnable invokeChannelWritableStateChangedTask;
volatile Runnable invokeFlushTask;
AbstractChannelHandlerContext(
DefaultChannelPipeline pipeline, ChannelHandlerInvoker invoker,
String name, boolean inbound, boolean outbound) {
if (name == null) {
throw new NullPointerException("name");
}
channel = pipeline.channel;
this.pipeline = pipeline;
this.name = name;
this.invoker = invoker;
this.inbound = inbound;
this.outbound = outbound;
}
/** Invocation initiated by {@link DefaultChannelPipeline#teardownAll()}}. */
void teardown() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
teardown0();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
teardown0();
}
});
}
}
private void teardown0() {
AbstractChannelHandlerContext prev = this.prev;
if (prev != null) {
synchronized (pipeline) {
pipeline.remove0(this);
}
prev.teardown();
}
}
@Override
public Channel channel() {
return channel;
}
@Override
public ChannelPipeline pipeline() {
return pipeline;
}
@Override
public ByteBufAllocator alloc() {
return channel().config().getAllocator();
}
@Override
public EventExecutor executor() {
return invoker().executor();
}
public ChannelHandlerInvoker invoker() {
if (invoker == null) {
return channel.unsafe().invoker();
} else {
return invoker;
}
}
@Override
public String name() {
return name;
}
@Override
public <T> Attribute<T> attr(AttributeKey<T> key) {
return channel.attr(key);
}
@Override
public <T> boolean hasAttr(AttributeKey<T> key) {
return channel.hasAttr(key);
}
@Override
public ChannelHandlerContext fireChannelRegistered() {
AbstractChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelRegistered(next);
return this;
}
@Override
public ChannelHandlerContext fireChannelUnregistered() {
AbstractChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelUnregistered(next);
return this;
}
@Override
public ChannelHandlerContext fireChannelActive() {
AbstractChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelActive(next);
return this;
}
@Override
public ChannelHandlerContext fireChannelInactive() {
AbstractChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelInactive(next);
return this;
}
@Override
public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
AbstractChannelHandlerContext next = this.next;
next.invoker().invokeExceptionCaught(next, cause);
return this;
}
@Override
public ChannelHandlerContext fireUserEventTriggered(Object event) {
AbstractChannelHandlerContext next = findContextInbound();
next.invoker().invokeUserEventTriggered(next, event);
return this;
}
@Override
public ChannelHandlerContext fireChannelRead(Object msg) {
AbstractChannelHandlerContext next = findContextInbound();
ReferenceCountUtil.touch(msg, next);
next.invoker().invokeChannelRead(next, msg);
return this;
}
@Override
public ChannelHandlerContext fireChannelReadComplete() {
AbstractChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelReadComplete(next);
return this;
}
@Override
public ChannelHandlerContext fireChannelWritabilityChanged() {
AbstractChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelWritabilityChanged(next);
return this;
}
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return bind(localAddress, newPromise());
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
return connect(remoteAddress, newPromise());
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return connect(remoteAddress, localAddress, newPromise());
}
@Override
public ChannelFuture disconnect() {
return disconnect(newPromise());
}
@Override
public ChannelFuture close() {
return close(newPromise());
}
@Override
public ChannelFuture deregister() {
return deregister(newPromise());
}
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
next.invoker().invokeBind(next, localAddress, promise);
return promise;
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return connect(remoteAddress, null, promise);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
next.invoker().invokeConnect(next, remoteAddress, localAddress, promise);
return promise;
}
@Override
public ChannelFuture disconnect(ChannelPromise promise) {
if (!channel().metadata().hasDisconnect()) {
return close(promise);
}
AbstractChannelHandlerContext next = findContextOutbound();
next.invoker().invokeDisconnect(next, promise);
return promise;
}
@Override
public ChannelFuture close(ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
next.invoker().invokeClose(next, promise);
return promise;
}
@Override
public ChannelFuture deregister(ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
next.invoker().invokeDeregister(next, promise);
return promise;
}
@Override
public ChannelHandlerContext read() {
AbstractChannelHandlerContext next = findContextOutbound();
next.invoker().invokeRead(next);
return this;
}
@Override
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}
@Override
public ChannelFuture write(Object msg, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
ReferenceCountUtil.touch(msg, next);
next.invoker().invokeWrite(next, msg, promise);
return promise;
}
@Override
public ChannelHandlerContext flush() {
AbstractChannelHandlerContext next = findContextOutbound();
next.invoker().invokeFlush(next);
return this;
}
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
ReferenceCountUtil.touch(msg, next);
ChannelHandlerInvoker invoker = next.invoker();
invoker.invokeWrite(next, msg, promise);
invoker.invokeFlush(next);
return promise;
}
@Override
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
@Override
public ChannelPromise newPromise() {
return new DefaultChannelPromise(channel(), executor());
}
@Override
public ChannelProgressivePromise newProgressivePromise() {
return new DefaultChannelProgressivePromise(channel(), executor());
}
@Override
public ChannelFuture newSucceededFuture() {
ChannelFuture succeededFuture = this.succeededFuture;
if (succeededFuture == null) {
this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor());
}
return succeededFuture;
}
@Override
public ChannelFuture newFailedFuture(Throwable cause) {
return new FailedChannelFuture(channel(), executor(), cause);
}
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
@Override
public ChannelPromise voidPromise() {
return channel.voidPromise();
}
void setRemoved() {
removed = true;
}
@Override
public boolean isRemoved() {
return removed;
}
@Override
public String toHintString() {
return '\'' + name + "' will handle the message from this point.";
}
@Override
public String toString() {
return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel + ')';
}
}

View File

@ -181,7 +181,7 @@ public class ChannelHandlerAppender extends ChannelInboundHandlerAdapter {
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
added = true; added = true;
DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx; AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx;
DefaultChannelPipeline pipeline = (DefaultChannelPipeline) dctx.pipeline(); DefaultChannelPipeline pipeline = (DefaultChannelPipeline) dctx.pipeline();
String name = dctx.name(); String name = dctx.name();
try { try {

View File

@ -1,125 +1,31 @@
/* /*
* Copyright 2012 The Netty Project * Copyright 2014 The Netty Project
* *
* The Netty Project licenses this file to you under the Apache License, * The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance * version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at: * with the License. You may obtain a copy of the License at:
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package io.netty.channel; package io.netty.channel;
import io.netty.buffer.ByteBufAllocator; final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ResourceLeakHint;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.StringUtil;
import java.net.SocketAddress;
final class DefaultChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
volatile DefaultChannelHandlerContext next;
volatile DefaultChannelHandlerContext prev;
private final boolean inbound;
private final boolean outbound;
private final AbstractChannel channel;
private final DefaultChannelPipeline pipeline;
private final String name;
private final ChannelHandler handler; private final ChannelHandler handler;
private boolean removed;
final ChannelHandlerInvoker invoker;
private ChannelFuture succeededFuture;
// Lazily instantiated tasks used to trigger events to a handler with different executor.
// These needs to be volatile as otherwise an other Thread may see an half initialized instance.
// See the JMM for more details
volatile Runnable invokeChannelReadCompleteTask;
volatile Runnable invokeReadTask;
volatile Runnable invokeChannelWritableStateChangedTask;
volatile Runnable invokeFlushTask;
DefaultChannelHandlerContext( DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, ChannelHandlerInvoker invoker, String name, ChannelHandler handler) { DefaultChannelPipeline pipeline, ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
super(pipeline, invoker, name, isInbound(handler), isOutbound(handler));
if (name == null) {
throw new NullPointerException("name");
}
if (handler == null) { if (handler == null) {
throw new NullPointerException("handler"); throw new NullPointerException("handler");
} }
channel = pipeline.channel;
this.pipeline = pipeline;
this.name = name;
this.handler = handler; this.handler = handler;
this.invoker = invoker;
inbound = handler instanceof ChannelInboundHandler;
outbound = handler instanceof ChannelOutboundHandler;
}
/** Invocation initiated by {@link DefaultChannelPipeline#teardownAll()}}. */
void teardown() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
teardown0();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
teardown0();
}
});
}
}
private void teardown0() {
DefaultChannelHandlerContext prev = this.prev;
if (prev != null) {
synchronized (pipeline) {
pipeline.remove0(this);
}
prev.teardown();
}
}
@Override
public Channel channel() {
return channel;
}
@Override
public ChannelPipeline pipeline() {
return pipeline;
}
@Override
public ByteBufAllocator alloc() {
return channel().config().getAllocator();
}
@Override
public EventExecutor executor() {
return invoker().executor();
}
public ChannelHandlerInvoker invoker() {
if (invoker == null) {
return channel.unsafe().invoker();
} else {
return invoker;
}
} }
@Override @Override
@ -127,262 +33,11 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
return handler; return handler;
} }
@Override private static boolean isInbound(ChannelHandler handler) {
public String name() { return handler instanceof ChannelInboundHandler;
return name;
} }
@Override private static boolean isOutbound(ChannelHandler handler) {
public <T> Attribute<T> attr(AttributeKey<T> key) { return handler instanceof ChannelOutboundHandler;
return channel.attr(key);
}
@Override
public <T> boolean hasAttr(AttributeKey<T> key) {
return channel.hasAttr(key);
}
@Override
public ChannelHandlerContext fireChannelRegistered() {
DefaultChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelRegistered(next);
return this;
}
@Override
public ChannelHandlerContext fireChannelUnregistered() {
DefaultChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelUnregistered(next);
return this;
}
@Override
public ChannelHandlerContext fireChannelActive() {
DefaultChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelActive(next);
return this;
}
@Override
public ChannelHandlerContext fireChannelInactive() {
DefaultChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelInactive(next);
return this;
}
@Override
public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
DefaultChannelHandlerContext next = this.next;
next.invoker().invokeExceptionCaught(next, cause);
return this;
}
@Override
public ChannelHandlerContext fireUserEventTriggered(Object event) {
DefaultChannelHandlerContext next = findContextInbound();
next.invoker().invokeUserEventTriggered(next, event);
return this;
}
@Override
public ChannelHandlerContext fireChannelRead(Object msg) {
DefaultChannelHandlerContext next = findContextInbound();
ReferenceCountUtil.touch(msg, next);
next.invoker().invokeChannelRead(next, msg);
return this;
}
@Override
public ChannelHandlerContext fireChannelReadComplete() {
DefaultChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelReadComplete(next);
return this;
}
@Override
public ChannelHandlerContext fireChannelWritabilityChanged() {
DefaultChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelWritabilityChanged(next);
return this;
}
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return bind(localAddress, newPromise());
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
return connect(remoteAddress, newPromise());
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return connect(remoteAddress, localAddress, newPromise());
}
@Override
public ChannelFuture disconnect() {
return disconnect(newPromise());
}
@Override
public ChannelFuture close() {
return close(newPromise());
}
@Override
public ChannelFuture deregister() {
return deregister(newPromise());
}
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
DefaultChannelHandlerContext next = findContextOutbound();
next.invoker().invokeBind(next, localAddress, promise);
return promise;
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return connect(remoteAddress, null, promise);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
DefaultChannelHandlerContext next = findContextOutbound();
next.invoker().invokeConnect(next, remoteAddress, localAddress, promise);
return promise;
}
@Override
public ChannelFuture disconnect(ChannelPromise promise) {
if (!channel().metadata().hasDisconnect()) {
return close(promise);
}
DefaultChannelHandlerContext next = findContextOutbound();
next.invoker().invokeDisconnect(next, promise);
return promise;
}
@Override
public ChannelFuture close(ChannelPromise promise) {
DefaultChannelHandlerContext next = findContextOutbound();
next.invoker().invokeClose(next, promise);
return promise;
}
@Override
public ChannelFuture deregister(ChannelPromise promise) {
DefaultChannelHandlerContext next = findContextOutbound();
next.invoker().invokeDeregister(next, promise);
return promise;
}
@Override
public ChannelHandlerContext read() {
DefaultChannelHandlerContext next = findContextOutbound();
next.invoker().invokeRead(next);
return this;
}
@Override
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}
@Override
public ChannelFuture write(Object msg, ChannelPromise promise) {
DefaultChannelHandlerContext next = findContextOutbound();
ReferenceCountUtil.touch(msg, next);
next.invoker().invokeWrite(next, msg, promise);
return promise;
}
@Override
public ChannelHandlerContext flush() {
DefaultChannelHandlerContext next = findContextOutbound();
next.invoker().invokeFlush(next);
return this;
}
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
DefaultChannelHandlerContext next = findContextOutbound();
ReferenceCountUtil.touch(msg, next);
ChannelHandlerInvoker invoker = next.invoker();
invoker.invokeWrite(next, msg, promise);
invoker.invokeFlush(next);
return promise;
}
@Override
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
@Override
public ChannelPromise newPromise() {
return new DefaultChannelPromise(channel(), executor());
}
@Override
public ChannelProgressivePromise newProgressivePromise() {
return new DefaultChannelProgressivePromise(channel(), executor());
}
@Override
public ChannelFuture newSucceededFuture() {
ChannelFuture succeededFuture = this.succeededFuture;
if (succeededFuture == null) {
this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor());
}
return succeededFuture;
}
@Override
public ChannelFuture newFailedFuture(Throwable cause) {
return new FailedChannelFuture(channel(), executor(), cause);
}
private DefaultChannelHandlerContext findContextInbound() {
DefaultChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
private DefaultChannelHandlerContext findContextOutbound() {
DefaultChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
@Override
public ChannelPromise voidPromise() {
return channel.voidPromise();
}
void setRemoved() {
removed = true;
}
@Override
public boolean isRemoved() {
return removed;
}
@Override
public String toHintString() {
return '\'' + name + "' will handle the message from this point.";
}
@Override
public String toString() {
return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel + ')';
} }
} }

View File

@ -165,7 +165,7 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
if (executor.inEventLoop()) { if (executor.inEventLoop()) {
invokeChannelReadCompleteNow(ctx); invokeChannelReadCompleteNow(ctx);
} else { } else {
DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx; AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx;
Runnable task = dctx.invokeChannelReadCompleteTask; Runnable task = dctx.invokeChannelReadCompleteTask;
if (task == null) { if (task == null) {
dctx.invokeChannelReadCompleteTask = task = new Runnable() { dctx.invokeChannelReadCompleteTask = task = new Runnable() {
@ -184,7 +184,7 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
if (executor.inEventLoop()) { if (executor.inEventLoop()) {
invokeChannelWritabilityChangedNow(ctx); invokeChannelWritabilityChangedNow(ctx);
} else { } else {
DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx; AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx;
Runnable task = dctx.invokeChannelWritableStateChangedTask; Runnable task = dctx.invokeChannelWritableStateChangedTask;
if (task == null) { if (task == null) {
dctx.invokeChannelWritableStateChangedTask = task = new Runnable() { dctx.invokeChannelWritableStateChangedTask = task = new Runnable() {
@ -307,7 +307,7 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
if (executor.inEventLoop()) { if (executor.inEventLoop()) {
invokeReadNow(ctx); invokeReadNow(ctx);
} else { } else {
DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx; AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx;
Runnable task = dctx.invokeReadTask; Runnable task = dctx.invokeReadTask;
if (task == null) { if (task == null) {
dctx.invokeReadTask = task = new Runnable() { dctx.invokeReadTask = task = new Runnable() {
@ -353,7 +353,7 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
if (executor.inEventLoop()) { if (executor.inEventLoop()) {
invokeFlushNow(ctx); invokeFlushNow(ctx);
} else { } else {
DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx; AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx;
Runnable task = dctx.invokeFlushTask; Runnable task = dctx.invokeFlushTask;
if (task == null) { if (task == null) {
dctx.invokeFlushTask = task = new Runnable() { dctx.invokeFlushTask = task = new Runnable() {

View File

@ -57,11 +57,11 @@ final class DefaultChannelPipeline implements ChannelPipeline {
final AbstractChannel channel; final AbstractChannel channel;
final DefaultChannelHandlerContext head; final AbstractChannelHandlerContext head;
final DefaultChannelHandlerContext tail; final AbstractChannelHandlerContext tail;
private final Map<String, DefaultChannelHandlerContext> name2ctx = private final Map<String, AbstractChannelHandlerContext> name2ctx =
new HashMap<String, DefaultChannelHandlerContext>(4); new HashMap<String, AbstractChannelHandlerContext>(4);
/** /**
* @see #findInvoker(EventExecutorGroup) * @see #findInvoker(EventExecutorGroup)
@ -74,11 +74,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
this.channel = channel; this.channel = channel;
TailHandler tailHandler = new TailHandler(); tail = new TailContext(this);
tail = new DefaultChannelHandlerContext(this, null, generateName(tailHandler), tailHandler); head = new HeadContext(this);
HeadHandler headHandler = new HeadHandler(channel.unsafe());
head = new DefaultChannelHandlerContext(this, null, generateName(headHandler), headHandler);
head.next = tail; head.next = tail;
tail.prev = head; tail.prev = head;
@ -112,10 +109,10 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return this; return this;
} }
private void addFirst0(String name, DefaultChannelHandlerContext newCtx) { private void addFirst0(String name, AbstractChannelHandlerContext newCtx) {
checkMultiplicity(newCtx); checkMultiplicity(newCtx);
DefaultChannelHandlerContext nextCtx = head.next; AbstractChannelHandlerContext nextCtx = head.next;
newCtx.prev = head; newCtx.prev = head;
newCtx.next = nextCtx; newCtx.next = nextCtx;
head.next = newCtx; head.next = newCtx;
@ -149,10 +146,10 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return this; return this;
} }
private void addLast0(final String name, DefaultChannelHandlerContext newCtx) { private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
checkMultiplicity(newCtx); checkMultiplicity(newCtx);
DefaultChannelHandlerContext prev = tail.prev; AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev; newCtx.prev = prev;
newCtx.next = tail; newCtx.next = tail;
prev.next = newCtx; prev.next = newCtx;
@ -171,7 +168,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) { public ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
synchronized (this) { synchronized (this) {
DefaultChannelHandlerContext ctx = getContextOrDie(baseName); AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
checkDuplicateName(name); checkDuplicateName(name);
addBefore0(name, ctx, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler)); addBefore0(name, ctx, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
} }
@ -182,14 +179,15 @@ final class DefaultChannelPipeline implements ChannelPipeline {
public ChannelPipeline addBefore( public ChannelPipeline addBefore(
ChannelHandlerInvoker invoker, String baseName, final String name, ChannelHandler handler) { ChannelHandlerInvoker invoker, String baseName, final String name, ChannelHandler handler) {
synchronized (this) { synchronized (this) {
DefaultChannelHandlerContext ctx = getContextOrDie(baseName); AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
checkDuplicateName(name); checkDuplicateName(name);
addBefore0(name, ctx, new DefaultChannelHandlerContext(this, invoker, name, handler)); addBefore0(name, ctx, new DefaultChannelHandlerContext(this, invoker, name, handler));
} }
return this; return this;
} }
private void addBefore0(final String name, DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext newCtx) { private void addBefore0(
final String name, AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
checkMultiplicity(newCtx); checkMultiplicity(newCtx);
newCtx.prev = ctx.prev; newCtx.prev = ctx.prev;
@ -210,7 +208,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) { public ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
synchronized (this) { synchronized (this) {
DefaultChannelHandlerContext ctx = getContextOrDie(baseName); AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
checkDuplicateName(name); checkDuplicateName(name);
addAfter0(name, ctx, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler)); addAfter0(name, ctx, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
} }
@ -221,14 +219,14 @@ final class DefaultChannelPipeline implements ChannelPipeline {
public ChannelPipeline addAfter( public ChannelPipeline addAfter(
ChannelHandlerInvoker invoker, String baseName, final String name, ChannelHandler handler) { ChannelHandlerInvoker invoker, String baseName, final String name, ChannelHandler handler) {
synchronized (this) { synchronized (this) {
DefaultChannelHandlerContext ctx = getContextOrDie(baseName); AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
checkDuplicateName(name); checkDuplicateName(name);
addAfter0(name, ctx, new DefaultChannelHandlerContext(this, invoker, name, handler)); addAfter0(name, ctx, new DefaultChannelHandlerContext(this, invoker, name, handler));
} }
return this; return this;
} }
private void addAfter0(final String name, DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext newCtx) { private void addAfter0(final String name, AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
checkDuplicateName(name); checkDuplicateName(name);
checkMultiplicity(newCtx); checkMultiplicity(newCtx);
@ -367,7 +365,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
synchronized (cache) { synchronized (cache) {
name = cache.get(handlerType); name = cache.get(handlerType);
if (name == null) { if (name == null) {
name = StringUtil.simpleClassName(handlerType) + "#0"; name = generateName0(handlerType);
cache.put(handlerType, name); cache.put(handlerType, name);
} }
} }
@ -390,6 +388,10 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return name; return name;
} }
private static String generateName0(Class<?> handlerType) {
return StringUtil.simpleClassName(handlerType) + "#0";
}
@Override @Override
public ChannelPipeline remove(ChannelHandler handler) { public ChannelPipeline remove(ChannelHandler handler) {
remove(getContextOrDie(handler)); remove(getContextOrDie(handler));
@ -407,10 +409,10 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return (T) remove(getContextOrDie(handlerType)).handler(); return (T) remove(getContextOrDie(handlerType)).handler();
} }
private DefaultChannelHandlerContext remove(final DefaultChannelHandlerContext ctx) { private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
assert ctx != head && ctx != tail; assert ctx != head && ctx != tail;
DefaultChannelHandlerContext context; AbstractChannelHandlerContext context;
Future<?> future; Future<?> future;
synchronized (this) { synchronized (this) {
@ -438,9 +440,9 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return context; return context;
} }
void remove0(DefaultChannelHandlerContext ctx) { void remove0(AbstractChannelHandlerContext ctx) {
DefaultChannelHandlerContext prev = ctx.prev; AbstractChannelHandlerContext prev = ctx.prev;
DefaultChannelHandlerContext next = ctx.next; AbstractChannelHandlerContext next = ctx.next;
prev.next = next; prev.next = next;
next.prev = prev; next.prev = prev;
name2ctx.remove(ctx.name()); name2ctx.remove(ctx.name());
@ -482,7 +484,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
private ChannelHandler replace( private ChannelHandler replace(
final DefaultChannelHandlerContext ctx, final String newName, final AbstractChannelHandlerContext ctx, final String newName,
ChannelHandler newHandler) { ChannelHandler newHandler) {
assert ctx != head && ctx != tail; assert ctx != head && ctx != tail;
@ -494,7 +496,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
checkDuplicateName(newName); checkDuplicateName(newName);
} }
final DefaultChannelHandlerContext newCtx = final AbstractChannelHandlerContext newCtx =
new DefaultChannelHandlerContext(this, ctx.invoker, newName, newHandler); new DefaultChannelHandlerContext(this, ctx.invoker, newName, newHandler);
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
@ -520,12 +522,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return ctx.handler(); return ctx.handler();
} }
private void replace0(DefaultChannelHandlerContext oldCtx, String newName, private void replace0(AbstractChannelHandlerContext oldCtx, String newName,
DefaultChannelHandlerContext newCtx) { AbstractChannelHandlerContext newCtx) {
checkMultiplicity(newCtx); checkMultiplicity(newCtx);
DefaultChannelHandlerContext prev = oldCtx.prev; AbstractChannelHandlerContext prev = oldCtx.prev;
DefaultChannelHandlerContext next = oldCtx.next; AbstractChannelHandlerContext next = oldCtx.next;
newCtx.prev = prev; newCtx.prev = prev;
newCtx.next = next; newCtx.next = next;
@ -565,7 +567,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
} }
private void callHandlerAdded(final DefaultChannelHandlerContext ctx) { private void callHandlerAdded(final AbstractChannelHandlerContext ctx) {
if (ctx.channel().isRegistered() && !ctx.executor().inEventLoop()) { if (ctx.channel().isRegistered() && !ctx.executor().inEventLoop()) {
ctx.executor().execute(new Runnable() { ctx.executor().execute(new Runnable() {
@Override @Override
@ -578,7 +580,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
callHandlerAdded0(ctx); callHandlerAdded0(ctx);
} }
private void callHandlerAdded0(final DefaultChannelHandlerContext ctx) { private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try { try {
ctx.handler().handlerAdded(ctx); ctx.handler().handlerAdded(ctx);
} catch (Throwable t) { } catch (Throwable t) {
@ -604,7 +606,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
} }
private void callHandlerRemoved(final DefaultChannelHandlerContext ctx) { private void callHandlerRemoved(final AbstractChannelHandlerContext ctx) {
if (ctx.channel().isRegistered() && !ctx.executor().inEventLoop()) { if (ctx.channel().isRegistered() && !ctx.executor().inEventLoop()) {
ctx.executor().execute(new Runnable() { ctx.executor().execute(new Runnable() {
@Override @Override
@ -617,7 +619,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
callHandlerRemoved0(ctx); callHandlerRemoved0(ctx);
} }
private void callHandlerRemoved0(final DefaultChannelHandlerContext ctx) { private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
// Notify the complete removal. // Notify the complete removal.
try { try {
ctx.handler().handlerRemoved(ctx); ctx.handler().handlerRemoved(ctx);
@ -666,7 +668,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public ChannelHandlerContext firstContext() { public ChannelHandlerContext firstContext() {
DefaultChannelHandlerContext first = head.next; AbstractChannelHandlerContext first = head.next;
if (first == tail) { if (first == tail) {
return null; return null;
} }
@ -675,7 +677,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public ChannelHandler last() { public ChannelHandler last() {
DefaultChannelHandlerContext last = tail.prev; AbstractChannelHandlerContext last = tail.prev;
if (last == head) { if (last == head) {
return null; return null;
} }
@ -684,7 +686,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public ChannelHandlerContext lastContext() { public ChannelHandlerContext lastContext() {
DefaultChannelHandlerContext last = tail.prev; AbstractChannelHandlerContext last = tail.prev;
if (last == head) { if (last == head) {
return null; return null;
} }
@ -729,7 +731,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
throw new NullPointerException("handler"); throw new NullPointerException("handler");
} }
DefaultChannelHandlerContext ctx = head.next; AbstractChannelHandlerContext ctx = head.next;
for (;;) { for (;;) {
if (ctx == null) { if (ctx == null) {
@ -750,7 +752,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
throw new NullPointerException("handlerType"); throw new NullPointerException("handlerType");
} }
DefaultChannelHandlerContext ctx = head.next; AbstractChannelHandlerContext ctx = head.next;
for (;;) { for (;;) {
if (ctx == null) { if (ctx == null) {
return null; return null;
@ -765,7 +767,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public List<String> names() { public List<String> names() {
List<String> list = new ArrayList<String>(); List<String> list = new ArrayList<String>();
DefaultChannelHandlerContext ctx = head.next; AbstractChannelHandlerContext ctx = head.next;
for (;;) { for (;;) {
if (ctx == null) { if (ctx == null) {
return list; return list;
@ -778,7 +780,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public Map<String, ChannelHandler> toMap() { public Map<String, ChannelHandler> toMap() {
Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>(); Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
DefaultChannelHandlerContext ctx = head.next; AbstractChannelHandlerContext ctx = head.next;
for (;;) { for (;;) {
if (ctx == tail) { if (ctx == tail) {
return map; return map;
@ -801,7 +803,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
StringBuilder buf = new StringBuilder(); StringBuilder buf = new StringBuilder();
buf.append(StringUtil.simpleClassName(this)); buf.append(StringUtil.simpleClassName(this));
buf.append('{'); buf.append('{');
DefaultChannelHandlerContext ctx = head.next; AbstractChannelHandlerContext ctx = head.next;
for (;;) { for (;;) {
if (ctx == tail) { if (ctx == tail) {
break; break;
@ -998,8 +1000,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
} }
private DefaultChannelHandlerContext getContextOrDie(String name) { private AbstractChannelHandlerContext getContextOrDie(String name) {
DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) context(name); AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
if (ctx == null) { if (ctx == null) {
throw new NoSuchElementException(name); throw new NoSuchElementException(name);
} else { } else {
@ -1007,8 +1009,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
} }
private DefaultChannelHandlerContext getContextOrDie(ChannelHandler handler) { private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) context(handler); AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
if (ctx == null) { if (ctx == null) {
throw new NoSuchElementException(handler.getClass().getName()); throw new NoSuchElementException(handler.getClass().getName());
} else { } else {
@ -1016,8 +1018,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
} }
private DefaultChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) { private AbstractChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) context(handlerType); AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handlerType);
if (ctx == null) { if (ctx == null) {
throw new NoSuchElementException(handlerType.getName()); throw new NoSuchElementException(handlerType.getName());
} else { } else {
@ -1026,7 +1028,18 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
// A special catch-all handler that handles both bytes and messages. // A special catch-all handler that handles both bytes and messages.
static final class TailHandler implements ChannelInboundHandler { static final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
private static final String TAIL_NAME = generateName0(TailContext.class);
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false);
}
@Override
public ChannelHandler handler() {
return this;
}
@Override @Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { } public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
@ -1074,12 +1087,31 @@ final class DefaultChannelPipeline implements ChannelPipeline {
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }
} }
static final class HeadHandler extends ChannelOutboundHandlerAdapter { static final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler {
private static final String HEAD_NAME = generateName0(HeadContext.class);
private final Unsafe unsafe; private final Unsafe unsafe;
HeadHandler(Unsafe unsafe) { HeadContext(DefaultChannelPipeline pipeline) {
this.unsafe = unsafe; super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
}
@Override
public ChannelHandler handler() {
return this;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
} }
@Override @Override

View File

@ -259,7 +259,7 @@ public class DefaultChannelPipelineTest {
pipeline.addBefore("1", "0", newHandler()); pipeline.addBefore("1", "0", newHandler());
pipeline.addAfter("10", "11", newHandler()); pipeline.addAfter("10", "11", newHandler());
DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) pipeline.firstContext(); AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) pipeline.firstContext();
assertNotNull(ctx); assertNotNull(ctx);
while (ctx != null) { while (ctx != null) {
int i = toInt(ctx.name()); int i = toInt(ctx.name());
@ -548,8 +548,8 @@ public class DefaultChannelPipelineTest {
assertNull(pipeline.last()); assertNull(pipeline.last());
} }
private static int next(DefaultChannelHandlerContext ctx) { private static int next(AbstractChannelHandlerContext ctx) {
DefaultChannelHandlerContext next = ctx.next; AbstractChannelHandlerContext next = ctx.next;
if (next == null) { if (next == null) {
return Integer.MAX_VALUE; return Integer.MAX_VALUE;
} }
@ -566,7 +566,7 @@ public class DefaultChannelPipelineTest {
} }
private static void verifyContextNumber(ChannelPipeline pipeline, int expectedNumber) { private static void verifyContextNumber(ChannelPipeline pipeline, int expectedNumber) {
DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) pipeline.firstContext(); AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) pipeline.firstContext();
int handlerNumber = 0; int handlerNumber = 0;
while (ctx != ((DefaultChannelPipeline) pipeline).tail) { while (ctx != ((DefaultChannelPipeline) pipeline).tail) {
handlerNumber++; handlerNumber++;