From 6b17aaad713253d370afd3634edc314a5527d86f Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sat, 7 Jun 2014 12:02:59 +0000 Subject: [PATCH] Optimize DefaultChannelPipeline in terms of memory usage and initialization time Motivation: Each of DefaultChannelPipeline instance creates an head and tail that wraps a handler. These are used to chain together other DefaultChannelHandlerContext that are created once a new ChannelHandler is added. There are a few things here that can be improved in terms of memory usage and initialization time. Modification: - Only generate the name for the tail and head one time as it will never change anyway - Only compute the skipFlags for the tail and head one time as it will never change - Rename DefaultChannelHandlerContext to AbstractChannelHandlerContext and make it abstract - Create a new DefaultChannelHandlerContext that is used when a ChannelHandler is added to the DefaultChannelPipeline - Rename TailHandler to TailContext and HeadHandler to HeadContext and let them extend AbstractChannelHandlerContext. This way we can save 2 object creations per DefaultChannelPipeline Result: - Less memory usage because we have 2 less objects per DefaultChannelPipeline - Faster creation of DefaultChannelPipeline as we not need to compute the skipFlags and not need to generate the name for the head and tail --- .../AbstractChannelHandlerContext.java | 546 +++++++++++++++++ .../netty/channel/ChannelHandlerAppender.java | 2 +- .../channel/DefaultChannelHandlerContext.java | 559 +----------------- .../channel/DefaultChannelHandlerInvoker.java | 8 +- .../netty/channel/DefaultChannelPipeline.java | 252 ++++++-- .../channel/DefaultChannelPipelineTest.java | 8 +- 6 files changed, 771 insertions(+), 604 deletions(-) create mode 100644 transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java new file mode 100644 index 0000000000..ad0b51ff91 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java @@ -0,0 +1,546 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandler.Skip; +import io.netty.util.Attribute; +import io.netty.util.AttributeKey; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.ResourceLeakHint; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.StringUtil; + +import java.net.SocketAddress; +import java.util.WeakHashMap; + +abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint { + + // This class keeps an integer member field 'skipFlags' whose each bit tells if the corresponding handler method + // is annotated with @Skip. 'skipFlags' is retrieved in runtime via the reflection API and is cached. + // The following constants signify which bit of 'skipFlags' corresponds to which handler method: + + static final int MASK_HANDLER_ADDED = 1; + static final int MASK_HANDLER_REMOVED = 1 << 1; + + private static final int MASK_EXCEPTION_CAUGHT = 1 << 2; + private static final int MASK_CHANNEL_REGISTERED = 1 << 3; + private static final int MASK_CHANNEL_UNREGISTERED = 1 << 4; + private static final int MASK_CHANNEL_ACTIVE = 1 << 5; + private static final int MASK_CHANNEL_INACTIVE = 1 << 6; + private static final int MASK_CHANNEL_READ = 1 << 7; + private static final int MASK_CHANNEL_READ_COMPLETE = 1 << 8; + private static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 9; + private static final int MASK_USER_EVENT_TRIGGERED = 1 << 10; + + private static final int MASK_BIND = 1 << 11; + private static final int MASK_CONNECT = 1 << 12; + private static final int MASK_DISCONNECT = 1 << 13; + private static final int MASK_CLOSE = 1 << 14; + private static final int MASK_DEREGISTER = 1 << 15; + private static final int MASK_READ = 1 << 16; + private static final int MASK_WRITE = 1 << 17; + private static final int MASK_FLUSH = 1 << 18; + + private static final int MASKGROUP_INBOUND = MASK_EXCEPTION_CAUGHT | + MASK_CHANNEL_REGISTERED | + MASK_CHANNEL_UNREGISTERED | + MASK_CHANNEL_ACTIVE | + MASK_CHANNEL_INACTIVE | + MASK_CHANNEL_READ | + MASK_CHANNEL_READ_COMPLETE | + MASK_CHANNEL_WRITABILITY_CHANGED | + MASK_USER_EVENT_TRIGGERED; + + private static final int MASKGROUP_OUTBOUND = MASK_BIND | + MASK_CONNECT | + MASK_DISCONNECT | + MASK_CLOSE | + MASK_DEREGISTER | + MASK_READ | + MASK_WRITE | + MASK_FLUSH; + + /** + * Cache the result of the costly generation of {@link #skipFlags} in the partitioned synchronized + * {@link WeakHashMap}. + */ + @SuppressWarnings("unchecked") + private static final WeakHashMap, Integer>[] skipFlagsCache = + new WeakHashMap[Runtime.getRuntime().availableProcessors()]; + + static { + for (int i = 0; i < skipFlagsCache.length; i ++) { + skipFlagsCache[i] = new WeakHashMap, Integer>(); + } + } + + /** + * Returns an integer bitset that tells which handler methods were annotated with {@link Skip}. + * It gets the value from {@link #skipFlagsCache} if an handler of the same type were queried before. + * Otherwise, it delegates to {@link #skipFlags0(Class)} to get it. + */ + static int skipFlags(ChannelHandler handler) { + WeakHashMap, Integer> cache = + skipFlagsCache[(int) (Thread.currentThread().getId() % skipFlagsCache.length)]; + Class handlerType = handler.getClass(); + int flagsVal; + synchronized (cache) { + Integer flags = cache.get(handlerType); + if (flags != null) { + flagsVal = flags; + } else { + flagsVal = skipFlags0(handlerType); + cache.put(handlerType, Integer.valueOf(flagsVal)); + } + } + + return flagsVal; + } + + /** + * Determines the {@link #skipFlags} of the specified {@code handlerType} using the reflection API. + */ + static int skipFlags0(Class handlerType) { + int flags = 0; + try { + if (isSkippable(handlerType, "handlerAdded")) { + flags |= MASK_HANDLER_ADDED; + } + if (isSkippable(handlerType, "handlerRemoved")) { + flags |= MASK_HANDLER_REMOVED; + } + if (isSkippable(handlerType, "exceptionCaught", Throwable.class)) { + flags |= MASK_EXCEPTION_CAUGHT; + } + if (isSkippable(handlerType, "channelRegistered")) { + flags |= MASK_CHANNEL_REGISTERED; + } + if (isSkippable(handlerType, "channelUnregistered")) { + flags |= MASK_CHANNEL_UNREGISTERED; + } + if (isSkippable(handlerType, "channelActive")) { + flags |= MASK_CHANNEL_ACTIVE; + } + if (isSkippable(handlerType, "channelInactive")) { + flags |= MASK_CHANNEL_INACTIVE; + } + if (isSkippable(handlerType, "channelRead", Object.class)) { + flags |= MASK_CHANNEL_READ; + } + if (isSkippable(handlerType, "channelReadComplete")) { + flags |= MASK_CHANNEL_READ_COMPLETE; + } + if (isSkippable(handlerType, "channelWritabilityChanged")) { + flags |= MASK_CHANNEL_WRITABILITY_CHANGED; + } + if (isSkippable(handlerType, "userEventTriggered", Object.class)) { + flags |= MASK_USER_EVENT_TRIGGERED; + } + if (isSkippable(handlerType, "bind", SocketAddress.class, ChannelPromise.class)) { + flags |= MASK_BIND; + } + if (isSkippable(handlerType, "connect", SocketAddress.class, SocketAddress.class, ChannelPromise.class)) { + flags |= MASK_CONNECT; + } + if (isSkippable(handlerType, "disconnect", ChannelPromise.class)) { + flags |= MASK_DISCONNECT; + } + if (isSkippable(handlerType, "close", ChannelPromise.class)) { + flags |= MASK_CLOSE; + } + if (isSkippable(handlerType, "deregister", ChannelPromise.class)) { + flags |= MASK_DEREGISTER; + } + if (isSkippable(handlerType, "read")) { + flags |= MASK_READ; + } + if (isSkippable(handlerType, "write", Object.class, ChannelPromise.class)) { + flags |= MASK_WRITE; + } + if (isSkippable(handlerType, "flush")) { + flags |= MASK_FLUSH; + } + } catch (Exception e) { + // Should never reach here. + PlatformDependent.throwException(e); + } + + return flags; + } + + @SuppressWarnings("rawtypes") + private static boolean isSkippable( + Class handlerType, String methodName, Class... paramTypes) throws Exception { + + Class[] newParamTypes = new Class[paramTypes.length + 1]; + newParamTypes[0] = ChannelHandlerContext.class; + System.arraycopy(paramTypes, 0, newParamTypes, 1, paramTypes.length); + + return handlerType.getMethod(methodName, newParamTypes).isAnnotationPresent(Skip.class); + } + + volatile AbstractChannelHandlerContext next; + volatile AbstractChannelHandlerContext prev; + + private final AbstractChannel channel; + private final DefaultChannelPipeline pipeline; + private final String name; + private boolean removed; + + final int skipFlags; + + // Will be set to null if no child executor should be used, otherwise it will be set to the + // child executor. + final ChannelHandlerInvoker invoker; + private ChannelFuture succeededFuture; + + // Lazily instantiated tasks used to trigger events to a handler with different executor. + // These needs to be volatile as otherwise an other Thread may see an half initialized instance. + // See the JMM for more details + volatile Runnable invokeChannelReadCompleteTask; + volatile Runnable invokeReadTask; + volatile Runnable invokeFlushTask; + volatile Runnable invokeChannelWritableStateChangedTask; + + AbstractChannelHandlerContext( + DefaultChannelPipeline pipeline, ChannelHandlerInvoker invoker, String name, int skipFlags) { + + if (name == null) { + throw new NullPointerException("name"); + } + + channel = pipeline.channel; + this.pipeline = pipeline; + this.name = name; + this.invoker = invoker; + this.skipFlags = skipFlags; + } + + /** Invocation initiated by {@link DefaultChannelPipeline#teardownAll()}}. */ + void teardown() { + EventExecutor executor = executor(); + if (executor.inEventLoop()) { + teardown0(); + } else { + executor.execute(new Runnable() { + @Override + public void run() { + teardown0(); + } + }); + } + } + + private void teardown0() { + AbstractChannelHandlerContext prev = this.prev; + if (prev != null) { + synchronized (pipeline) { + pipeline.remove0(this); + } + prev.teardown(); + } + } + + @Override + public Channel channel() { + return channel; + } + + @Override + public ChannelPipeline pipeline() { + return pipeline; + } + + @Override + public ByteBufAllocator alloc() { + return channel().config().getAllocator(); + } + + @Override + public EventExecutor executor() { + return invoker().executor(); + } + + @Override + public ChannelHandlerInvoker invoker() { + if (invoker == null) { + return channel.unsafe().invoker(); + } + return invoker; + } + + @Override + public String name() { + return name; + } + + @Override + public Attribute attr(AttributeKey key) { + return channel.attr(key); + } + + @Override + public boolean hasAttr(AttributeKey key) { + return channel.hasAttr(key); + } + + @Override + public ChannelHandlerContext fireChannelRegistered() { + AbstractChannelHandlerContext next = findContextInbound(); + next.invoker().invokeChannelRegistered(next); + return this; + } + + @Override + public ChannelHandlerContext fireChannelUnregistered() { + AbstractChannelHandlerContext next = findContextInbound(); + next.invoker().invokeChannelUnregistered(next); + return this; + } + + @Override + public ChannelHandlerContext fireChannelActive() { + AbstractChannelHandlerContext next = findContextInbound(); + next.invoker().invokeChannelActive(next); + return this; + } + + @Override + public ChannelHandlerContext fireChannelInactive() { + AbstractChannelHandlerContext next = findContextInbound(); + next.invoker().invokeChannelInactive(next); + return this; + } + + @Override + public ChannelHandlerContext fireExceptionCaught(Throwable cause) { + AbstractChannelHandlerContext next = findContextInbound(); + next.invoker().invokeExceptionCaught(next, cause); + return this; + } + + @Override + public ChannelHandlerContext fireUserEventTriggered(Object event) { + AbstractChannelHandlerContext next = findContextInbound(); + next.invoker().invokeUserEventTriggered(next, event); + return this; + } + + @Override + public ChannelHandlerContext fireChannelRead(Object msg) { + AbstractChannelHandlerContext next = findContextInbound(); + ReferenceCountUtil.touch(msg, next); + next.invoker().invokeChannelRead(next, msg); + return this; + } + + @Override + public ChannelHandlerContext fireChannelReadComplete() { + AbstractChannelHandlerContext next = findContextInbound(); + next.invoker().invokeChannelReadComplete(next); + return this; + } + + @Override + public ChannelHandlerContext fireChannelWritabilityChanged() { + AbstractChannelHandlerContext next = findContextInbound(); + next.invoker().invokeChannelWritabilityChanged(next); + return this; + } + + @Override + public ChannelFuture bind(SocketAddress localAddress) { + return bind(localAddress, newPromise()); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress) { + return connect(remoteAddress, newPromise()); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { + return connect(remoteAddress, localAddress, newPromise()); + } + + @Override + public ChannelFuture disconnect() { + return disconnect(newPromise()); + } + + @Override + public ChannelFuture close() { + return close(newPromise()); + } + + @Override + public ChannelFuture deregister() { + return deregister(newPromise()); + } + + @Override + public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { + AbstractChannelHandlerContext next = findContextOutbound(); + next.invoker().invokeBind(next, localAddress, promise); + return promise; + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { + return connect(remoteAddress, null, promise); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { + AbstractChannelHandlerContext next = findContextOutbound(); + next.invoker().invokeConnect(next, remoteAddress, localAddress, promise); + return promise; + } + + @Override + public ChannelFuture disconnect(ChannelPromise promise) { + if (!channel().metadata().hasDisconnect()) { + return close(promise); + } + + AbstractChannelHandlerContext next = findContextOutbound(); + next.invoker().invokeDisconnect(next, promise); + return promise; + } + + @Override + public ChannelFuture close(ChannelPromise promise) { + AbstractChannelHandlerContext next = findContextOutbound(); + next.invoker().invokeClose(next, promise); + return promise; + } + + @Override + public ChannelFuture deregister(ChannelPromise promise) { + AbstractChannelHandlerContext next = findContextOutbound(); + next.invoker().invokeDeregister(next, promise); + return promise; + } + + @Override + public ChannelHandlerContext read() { + AbstractChannelHandlerContext next = findContextOutbound(); + next.invoker().invokeRead(next); + return this; + } + + @Override + public ChannelFuture write(Object msg) { + return write(msg, newPromise()); + } + + @Override + public ChannelFuture write(Object msg, ChannelPromise promise) { + AbstractChannelHandlerContext next = findContextOutbound(); + ReferenceCountUtil.touch(msg, next); + next.invoker().invokeWrite(next, msg, promise); + return promise; + } + + @Override + public ChannelHandlerContext flush() { + AbstractChannelHandlerContext next = findContextOutbound(); + next.invoker().invokeFlush(next); + return this; + } + + @Override + public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { + AbstractChannelHandlerContext next; + next = findContextOutbound(); + ReferenceCountUtil.touch(msg, next); + next.invoker().invokeWrite(next, msg, promise); + next = findContextOutbound(); + next.invoker().invokeFlush(next); + return promise; + } + + @Override + public ChannelFuture writeAndFlush(Object msg) { + return writeAndFlush(msg, newPromise()); + } + + @Override + public ChannelPromise newPromise() { + return new DefaultChannelPromise(channel(), executor()); + } + + @Override + public ChannelProgressivePromise newProgressivePromise() { + return new DefaultChannelProgressivePromise(channel(), executor()); + } + + @Override + public ChannelFuture newSucceededFuture() { + ChannelFuture succeededFuture = this.succeededFuture; + if (succeededFuture == null) { + this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor()); + } + return succeededFuture; + } + + @Override + public ChannelFuture newFailedFuture(Throwable cause) { + return new FailedChannelFuture(channel(), executor(), cause); + } + + private AbstractChannelHandlerContext findContextInbound() { + AbstractChannelHandlerContext ctx = this; + do { + ctx = ctx.next; + } while ((ctx.skipFlags & MASKGROUP_INBOUND) == MASKGROUP_INBOUND); + return ctx; + } + + private AbstractChannelHandlerContext findContextOutbound() { + AbstractChannelHandlerContext ctx = this; + do { + ctx = ctx.prev; + } while ((ctx.skipFlags & MASKGROUP_OUTBOUND) == MASKGROUP_OUTBOUND); + return ctx; + } + + @Override + public ChannelPromise voidPromise() { + return channel.voidPromise(); + } + + void setRemoved() { + removed = true; + } + + @Override + public boolean isRemoved() { + return removed; + } + + @Override + public String toHintString() { + return '\'' + name + "' will handle the message from this point."; + } + + @Override + public String toString() { + return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel + ')'; + } +} diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerAppender.java b/transport/src/main/java/io/netty/channel/ChannelHandlerAppender.java index 0d99607da3..35894f5b40 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerAppender.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerAppender.java @@ -181,7 +181,7 @@ public class ChannelHandlerAppender extends ChannelHandlerAdapter { public void handlerAdded(ChannelHandlerContext ctx) throws Exception { added = true; - DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx; + AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx; DefaultChannelPipeline pipeline = (DefaultChannelPipeline) dctx.pipeline(); String name = dctx.name(); try { diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index cd192003d3..fe4abd6af0 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -1,557 +1,38 @@ /* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ +* Copyright 2014 The Netty Project +* +* The Netty Project licenses this file to you under the Apache License, +* version 2.0 (the "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at: +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations +* under the License. +*/ package io.netty.channel; -import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.ChannelHandler.Skip; -import io.netty.util.Attribute; -import io.netty.util.AttributeKey; -import io.netty.util.ReferenceCountUtil; -import io.netty.util.ResourceLeakHint; -import io.netty.util.concurrent.EventExecutor; -import io.netty.util.internal.PlatformDependent; -import io.netty.util.internal.StringUtil; - -import java.net.SocketAddress; -import java.util.WeakHashMap; - -final class DefaultChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint { - - // This class keeps an integer member field 'skipFlags' whose each bit tells if the corresponding handler method - // is annotated with @Skip. 'skipFlags' is retrieved in runtime via the reflection API and is cached. - // The following constants signify which bit of 'skipFlags' corresponds to which handler method: - - static final int MASK_HANDLER_ADDED = 1; - static final int MASK_HANDLER_REMOVED = 1 << 1; - - private static final int MASK_EXCEPTION_CAUGHT = 1 << 2; - private static final int MASK_CHANNEL_REGISTERED = 1 << 3; - private static final int MASK_CHANNEL_UNREGISTERED = 1 << 4; - private static final int MASK_CHANNEL_ACTIVE = 1 << 5; - private static final int MASK_CHANNEL_INACTIVE = 1 << 6; - private static final int MASK_CHANNEL_READ = 1 << 7; - private static final int MASK_CHANNEL_READ_COMPLETE = 1 << 8; - private static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 9; - private static final int MASK_USER_EVENT_TRIGGERED = 1 << 10; - - private static final int MASK_BIND = 1 << 11; - private static final int MASK_CONNECT = 1 << 12; - private static final int MASK_DISCONNECT = 1 << 13; - private static final int MASK_CLOSE = 1 << 14; - private static final int MASK_DEREGISTER = 1 << 15; - private static final int MASK_READ = 1 << 16; - private static final int MASK_WRITE = 1 << 17; - private static final int MASK_FLUSH = 1 << 18; - - private static final int MASKGROUP_INBOUND = MASK_EXCEPTION_CAUGHT | - MASK_CHANNEL_REGISTERED | - MASK_CHANNEL_UNREGISTERED | - MASK_CHANNEL_ACTIVE | - MASK_CHANNEL_INACTIVE | - MASK_CHANNEL_READ | - MASK_CHANNEL_READ_COMPLETE | - MASK_CHANNEL_WRITABILITY_CHANGED | - MASK_USER_EVENT_TRIGGERED; - - private static final int MASKGROUP_OUTBOUND = MASK_BIND | - MASK_CONNECT | - MASK_DISCONNECT | - MASK_CLOSE | - MASK_DEREGISTER | - MASK_READ | - MASK_WRITE | - MASK_FLUSH; - - /** - * Cache the result of the costly generation of {@link #skipFlags} in the partitioned synchronized - * {@link WeakHashMap}. - */ - @SuppressWarnings("unchecked") - private static final WeakHashMap, Integer>[] skipFlagsCache = - new WeakHashMap[Runtime.getRuntime().availableProcessors()]; - - static { - for (int i = 0; i < skipFlagsCache.length; i ++) { - skipFlagsCache[i] = new WeakHashMap, Integer>(); - } - } - - /** - * Returns an integer bitset that tells which handler methods were annotated with {@link Skip}. - * It gets the value from {@link #skipFlagsCache} if an handler of the same type were queried before. - * Otherwise, it delegates to {@link #skipFlags0(Class)} to get it. - */ - private static int skipFlags(ChannelHandler handler) { - WeakHashMap, Integer> cache = - skipFlagsCache[(int) (Thread.currentThread().getId() % skipFlagsCache.length)]; - Class handlerType = handler.getClass(); - int flagsVal; - synchronized (cache) { - Integer flags = cache.get(handlerType); - if (flags != null) { - flagsVal = flags; - } else { - flagsVal = skipFlags0(handlerType); - cache.put(handlerType, Integer.valueOf(flagsVal)); - } - } - - return flagsVal; - } - - /** - * Determines the {@link #skipFlags} of the specified {@code handlerType} using the reflection API. - */ - private static int skipFlags0(Class handlerType) { - int flags = 0; - try { - if (isSkippable(handlerType, "handlerAdded")) { - flags |= MASK_HANDLER_ADDED; - } - if (isSkippable(handlerType, "handlerRemoved")) { - flags |= MASK_HANDLER_REMOVED; - } - if (isSkippable(handlerType, "exceptionCaught", Throwable.class)) { - flags |= MASK_EXCEPTION_CAUGHT; - } - if (isSkippable(handlerType, "channelRegistered")) { - flags |= MASK_CHANNEL_REGISTERED; - } - if (isSkippable(handlerType, "channelUnregistered")) { - flags |= MASK_CHANNEL_UNREGISTERED; - } - if (isSkippable(handlerType, "channelActive")) { - flags |= MASK_CHANNEL_ACTIVE; - } - if (isSkippable(handlerType, "channelInactive")) { - flags |= MASK_CHANNEL_INACTIVE; - } - if (isSkippable(handlerType, "channelRead", Object.class)) { - flags |= MASK_CHANNEL_READ; - } - if (isSkippable(handlerType, "channelReadComplete")) { - flags |= MASK_CHANNEL_READ_COMPLETE; - } - if (isSkippable(handlerType, "channelWritabilityChanged")) { - flags |= MASK_CHANNEL_WRITABILITY_CHANGED; - } - if (isSkippable(handlerType, "userEventTriggered", Object.class)) { - flags |= MASK_USER_EVENT_TRIGGERED; - } - if (isSkippable(handlerType, "bind", SocketAddress.class, ChannelPromise.class)) { - flags |= MASK_BIND; - } - if (isSkippable(handlerType, "connect", SocketAddress.class, SocketAddress.class, ChannelPromise.class)) { - flags |= MASK_CONNECT; - } - if (isSkippable(handlerType, "disconnect", ChannelPromise.class)) { - flags |= MASK_DISCONNECT; - } - if (isSkippable(handlerType, "close", ChannelPromise.class)) { - flags |= MASK_CLOSE; - } - if (isSkippable(handlerType, "deregister", ChannelPromise.class)) { - flags |= MASK_DEREGISTER; - } - if (isSkippable(handlerType, "read")) { - flags |= MASK_READ; - } - if (isSkippable(handlerType, "write", Object.class, ChannelPromise.class)) { - flags |= MASK_WRITE; - } - if (isSkippable(handlerType, "flush")) { - flags |= MASK_FLUSH; - } - } catch (Exception e) { - // Should never reach here. - PlatformDependent.throwException(e); - } - - return flags; - } - - @SuppressWarnings("rawtypes") - private static boolean isSkippable( - Class handlerType, String methodName, Class... paramTypes) throws Exception { - - Class[] newParamTypes = new Class[paramTypes.length + 1]; - newParamTypes[0] = ChannelHandlerContext.class; - System.arraycopy(paramTypes, 0, newParamTypes, 1, paramTypes.length); - - return handlerType.getMethod(methodName, newParamTypes).isAnnotationPresent(Skip.class); - } - - volatile DefaultChannelHandlerContext next; - volatile DefaultChannelHandlerContext prev; - - private final AbstractChannel channel; - private final DefaultChannelPipeline pipeline; - private final String name; +final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext { private final ChannelHandler handler; - private boolean removed; - - final int skipFlags; - - // Will be set to null if no child executor should be used, otherwise it will be set to the - // child executor. - final ChannelHandlerInvoker invoker; - private ChannelFuture succeededFuture; - - // Lazily instantiated tasks used to trigger events to a handler with different executor. - // These needs to be volatile as otherwise an other Thread may see an half initialized instance. - // See the JMM for more details - volatile Runnable invokeChannelReadCompleteTask; - volatile Runnable invokeReadTask; - volatile Runnable invokeFlushTask; - volatile Runnable invokeChannelWritableStateChangedTask; DefaultChannelHandlerContext( DefaultChannelPipeline pipeline, ChannelHandlerInvoker invoker, String name, ChannelHandler handler) { + super(pipeline, invoker, name, skipFlags(checkNull(handler))); + this.handler = handler; + } - if (name == null) { - throw new NullPointerException("name"); - } + private static ChannelHandler checkNull(ChannelHandler handler) { if (handler == null) { throw new NullPointerException("handler"); } - - channel = pipeline.channel; - this.pipeline = pipeline; - this.name = name; - this.handler = handler; - this.invoker = invoker; - - skipFlags = skipFlags(handler); - } - - /** Invocation initiated by {@link DefaultChannelPipeline#teardownAll()}}. */ - void teardown() { - EventExecutor executor = executor(); - if (executor.inEventLoop()) { - teardown0(); - } else { - executor.execute(new Runnable() { - @Override - public void run() { - teardown0(); - } - }); - } - } - - private void teardown0() { - DefaultChannelHandlerContext prev = this.prev; - if (prev != null) { - synchronized (pipeline) { - pipeline.remove0(this); - } - prev.teardown(); - } - } - - @Override - public Channel channel() { - return channel; - } - - @Override - public ChannelPipeline pipeline() { - return pipeline; - } - - @Override - public ByteBufAllocator alloc() { - return channel().config().getAllocator(); - } - - @Override - public EventExecutor executor() { - return invoker().executor(); - } - - @Override - public ChannelHandlerInvoker invoker() { - if (invoker == null) { - return channel.unsafe().invoker(); - } - return invoker; + return handler; } @Override public ChannelHandler handler() { return handler; } - - @Override - public String name() { - return name; - } - - @Override - public Attribute attr(AttributeKey key) { - return channel.attr(key); - } - - @Override - public boolean hasAttr(AttributeKey key) { - return channel.hasAttr(key); - } - - @Override - public ChannelHandlerContext fireChannelRegistered() { - DefaultChannelHandlerContext next = findContextInbound(); - next.invoker().invokeChannelRegistered(next); - return this; - } - - @Override - public ChannelHandlerContext fireChannelUnregistered() { - DefaultChannelHandlerContext next = findContextInbound(); - next.invoker().invokeChannelUnregistered(next); - return this; - } - - @Override - public ChannelHandlerContext fireChannelActive() { - DefaultChannelHandlerContext next = findContextInbound(); - next.invoker().invokeChannelActive(next); - return this; - } - - @Override - public ChannelHandlerContext fireChannelInactive() { - DefaultChannelHandlerContext next = findContextInbound(); - next.invoker().invokeChannelInactive(next); - return this; - } - - @Override - public ChannelHandlerContext fireExceptionCaught(Throwable cause) { - DefaultChannelHandlerContext next = findContextInbound(); - next.invoker().invokeExceptionCaught(next, cause); - return this; - } - - @Override - public ChannelHandlerContext fireUserEventTriggered(Object event) { - DefaultChannelHandlerContext next = findContextInbound(); - next.invoker().invokeUserEventTriggered(next, event); - return this; - } - - @Override - public ChannelHandlerContext fireChannelRead(Object msg) { - DefaultChannelHandlerContext next = findContextInbound(); - ReferenceCountUtil.touch(msg, next); - next.invoker().invokeChannelRead(next, msg); - return this; - } - - @Override - public ChannelHandlerContext fireChannelReadComplete() { - DefaultChannelHandlerContext next = findContextInbound(); - next.invoker().invokeChannelReadComplete(next); - return this; - } - - @Override - public ChannelHandlerContext fireChannelWritabilityChanged() { - DefaultChannelHandlerContext next = findContextInbound(); - next.invoker().invokeChannelWritabilityChanged(next); - return this; - } - - @Override - public ChannelFuture bind(SocketAddress localAddress) { - return bind(localAddress, newPromise()); - } - - @Override - public ChannelFuture connect(SocketAddress remoteAddress) { - return connect(remoteAddress, newPromise()); - } - - @Override - public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { - return connect(remoteAddress, localAddress, newPromise()); - } - - @Override - public ChannelFuture disconnect() { - return disconnect(newPromise()); - } - - @Override - public ChannelFuture close() { - return close(newPromise()); - } - - @Override - public ChannelFuture deregister() { - return deregister(newPromise()); - } - - @Override - public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { - DefaultChannelHandlerContext next = findContextOutbound(); - next.invoker().invokeBind(next, localAddress, promise); - return promise; - } - - @Override - public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { - return connect(remoteAddress, null, promise); - } - - @Override - public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { - DefaultChannelHandlerContext next = findContextOutbound(); - next.invoker().invokeConnect(next, remoteAddress, localAddress, promise); - return promise; - } - - @Override - public ChannelFuture disconnect(ChannelPromise promise) { - if (!channel().metadata().hasDisconnect()) { - return close(promise); - } - - DefaultChannelHandlerContext next = findContextOutbound(); - next.invoker().invokeDisconnect(next, promise); - return promise; - } - - @Override - public ChannelFuture close(ChannelPromise promise) { - DefaultChannelHandlerContext next = findContextOutbound(); - next.invoker().invokeClose(next, promise); - return promise; - } - - @Override - public ChannelFuture deregister(ChannelPromise promise) { - DefaultChannelHandlerContext next = findContextOutbound(); - next.invoker().invokeDeregister(next, promise); - return promise; - } - - @Override - public ChannelHandlerContext read() { - DefaultChannelHandlerContext next = findContextOutbound(); - next.invoker().invokeRead(next); - return this; - } - - @Override - public ChannelFuture write(Object msg) { - return write(msg, newPromise()); - } - - @Override - public ChannelFuture write(Object msg, ChannelPromise promise) { - DefaultChannelHandlerContext next = findContextOutbound(); - ReferenceCountUtil.touch(msg, next); - next.invoker().invokeWrite(next, msg, promise); - return promise; - } - - @Override - public ChannelHandlerContext flush() { - DefaultChannelHandlerContext next = findContextOutbound(); - next.invoker().invokeFlush(next); - return this; - } - - @Override - public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { - DefaultChannelHandlerContext next; - next = findContextOutbound(); - ReferenceCountUtil.touch(msg, next); - next.invoker().invokeWrite(next, msg, promise); - next = findContextOutbound(); - next.invoker().invokeFlush(next); - return promise; - } - - @Override - public ChannelFuture writeAndFlush(Object msg) { - return writeAndFlush(msg, newPromise()); - } - - @Override - public ChannelPromise newPromise() { - return new DefaultChannelPromise(channel(), executor()); - } - - @Override - public ChannelProgressivePromise newProgressivePromise() { - return new DefaultChannelProgressivePromise(channel(), executor()); - } - - @Override - public ChannelFuture newSucceededFuture() { - ChannelFuture succeededFuture = this.succeededFuture; - if (succeededFuture == null) { - this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor()); - } - return succeededFuture; - } - - @Override - public ChannelFuture newFailedFuture(Throwable cause) { - return new FailedChannelFuture(channel(), executor(), cause); - } - - private DefaultChannelHandlerContext findContextInbound() { - DefaultChannelHandlerContext ctx = this; - do { - ctx = ctx.next; - } while ((ctx.skipFlags & MASKGROUP_INBOUND) == MASKGROUP_INBOUND); - return ctx; - } - - private DefaultChannelHandlerContext findContextOutbound() { - DefaultChannelHandlerContext ctx = this; - do { - ctx = ctx.prev; - } while ((ctx.skipFlags & MASKGROUP_OUTBOUND) == MASKGROUP_OUTBOUND); - return ctx; - } - - @Override - public ChannelPromise voidPromise() { - return channel.voidPromise(); - } - - void setRemoved() { - removed = true; - } - - @Override - public boolean isRemoved() { - return removed; - } - - @Override - public String toHintString() { - return '\'' + name + "' will handle the message from this point."; - } - - @Override - public String toString() { - return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel + ')'; - } } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerInvoker.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerInvoker.java index 5f976e6810..7cafd6a7a7 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerInvoker.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerInvoker.java @@ -165,7 +165,7 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker { if (executor.inEventLoop()) { invokeChannelReadCompleteNow(ctx); } else { - DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx; + AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx; Runnable task = dctx.invokeChannelReadCompleteTask; if (task == null) { dctx.invokeChannelReadCompleteTask = task = new Runnable() { @@ -184,7 +184,7 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker { if (executor.inEventLoop()) { invokeChannelWritabilityChangedNow(ctx); } else { - DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx; + AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx; Runnable task = dctx.invokeChannelWritableStateChangedTask; if (task == null) { dctx.invokeChannelWritableStateChangedTask = task = new Runnable() { @@ -307,7 +307,7 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker { if (executor.inEventLoop()) { invokeReadNow(ctx); } else { - DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx; + AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx; Runnable task = dctx.invokeReadTask; if (task == null) { dctx.invokeReadTask = task = new Runnable() { @@ -353,7 +353,7 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker { if (executor.inEventLoop()) { invokeFlushNow(ctx); } else { - DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx; + AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx; Runnable task = dctx.invokeFlushTask; if (task == null) { dctx.invokeFlushTask = task = new Runnable() { diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index c5d8bb8d89..753135e8ca 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -57,11 +57,11 @@ final class DefaultChannelPipeline implements ChannelPipeline { final AbstractChannel channel; - final DefaultChannelHandlerContext head; - final DefaultChannelHandlerContext tail; + final AbstractChannelHandlerContext head; + final AbstractChannelHandlerContext tail; - private final Map name2ctx = - new HashMap(4); + private final Map name2ctx = + new HashMap(4); /** * @see #findInvoker(EventExecutorGroup) @@ -74,11 +74,8 @@ final class DefaultChannelPipeline implements ChannelPipeline { } this.channel = channel; - TailHandler tailHandler = new TailHandler(); - tail = new DefaultChannelHandlerContext(this, null, generateName(tailHandler), tailHandler); - - HeadHandler headHandler = new HeadHandler(channel.unsafe()); - head = new DefaultChannelHandlerContext(this, null, generateName(headHandler), headHandler); + tail = new TailContext(this); + head = new HeadContext(this); head.next = tail; tail.prev = head; @@ -112,10 +109,10 @@ final class DefaultChannelPipeline implements ChannelPipeline { return this; } - private void addFirst0(String name, DefaultChannelHandlerContext newCtx) { + private void addFirst0(String name, AbstractChannelHandlerContext newCtx) { checkMultiplicity(newCtx); - DefaultChannelHandlerContext nextCtx = head.next; + AbstractChannelHandlerContext nextCtx = head.next; newCtx.prev = head; newCtx.next = nextCtx; head.next = newCtx; @@ -149,10 +146,10 @@ final class DefaultChannelPipeline implements ChannelPipeline { return this; } - private void addLast0(final String name, DefaultChannelHandlerContext newCtx) { + private void addLast0(final String name, AbstractChannelHandlerContext newCtx) { checkMultiplicity(newCtx); - DefaultChannelHandlerContext prev = tail.prev; + AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; @@ -171,7 +168,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) { synchronized (this) { - DefaultChannelHandlerContext ctx = getContextOrDie(baseName); + AbstractChannelHandlerContext ctx = getContextOrDie(baseName); checkDuplicateName(name); addBefore0(name, ctx, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler)); } @@ -182,14 +179,15 @@ final class DefaultChannelPipeline implements ChannelPipeline { public ChannelPipeline addBefore( ChannelHandlerInvoker invoker, String baseName, final String name, ChannelHandler handler) { synchronized (this) { - DefaultChannelHandlerContext ctx = getContextOrDie(baseName); + AbstractChannelHandlerContext ctx = getContextOrDie(baseName); checkDuplicateName(name); addBefore0(name, ctx, new DefaultChannelHandlerContext(this, invoker, name, handler)); } return this; } - private void addBefore0(final String name, DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext newCtx) { + private void addBefore0( + final String name, AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) { checkMultiplicity(newCtx); newCtx.prev = ctx.prev; @@ -210,7 +208,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) { synchronized (this) { - DefaultChannelHandlerContext ctx = getContextOrDie(baseName); + AbstractChannelHandlerContext ctx = getContextOrDie(baseName); checkDuplicateName(name); addAfter0(name, ctx, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler)); } @@ -221,14 +219,14 @@ final class DefaultChannelPipeline implements ChannelPipeline { public ChannelPipeline addAfter( ChannelHandlerInvoker invoker, String baseName, final String name, ChannelHandler handler) { synchronized (this) { - DefaultChannelHandlerContext ctx = getContextOrDie(baseName); + AbstractChannelHandlerContext ctx = getContextOrDie(baseName); checkDuplicateName(name); addAfter0(name, ctx, new DefaultChannelHandlerContext(this, invoker, name, handler)); } return this; } - private void addAfter0(final String name, DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext newCtx) { + private void addAfter0(final String name, AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) { checkDuplicateName(name); checkMultiplicity(newCtx); @@ -367,7 +365,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { synchronized (cache) { name = cache.get(handlerType); if (name == null) { - name = StringUtil.simpleClassName(handlerType) + "#0"; + name = generateName0(handlerType); cache.put(handlerType, name); } } @@ -390,6 +388,10 @@ final class DefaultChannelPipeline implements ChannelPipeline { return name; } + private static String generateName0(Class handlerType) { + return StringUtil.simpleClassName(handlerType) + "#0"; + } + @Override public ChannelPipeline remove(ChannelHandler handler) { remove(getContextOrDie(handler)); @@ -407,10 +409,10 @@ final class DefaultChannelPipeline implements ChannelPipeline { return (T) remove(getContextOrDie(handlerType)).handler(); } - private DefaultChannelHandlerContext remove(final DefaultChannelHandlerContext ctx) { + private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) { assert ctx != head && ctx != tail; - DefaultChannelHandlerContext context; + AbstractChannelHandlerContext context; Future future; synchronized (this) { @@ -438,9 +440,9 @@ final class DefaultChannelPipeline implements ChannelPipeline { return context; } - void remove0(DefaultChannelHandlerContext ctx) { - DefaultChannelHandlerContext prev = ctx.prev; - DefaultChannelHandlerContext next = ctx.next; + void remove0(AbstractChannelHandlerContext ctx) { + AbstractChannelHandlerContext prev = ctx.prev; + AbstractChannelHandlerContext next = ctx.next; prev.next = next; next.prev = prev; name2ctx.remove(ctx.name()); @@ -482,7 +484,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } private ChannelHandler replace( - final DefaultChannelHandlerContext ctx, final String newName, + final AbstractChannelHandlerContext ctx, final String newName, ChannelHandler newHandler) { assert ctx != head && ctx != tail; @@ -494,7 +496,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { checkDuplicateName(newName); } - final DefaultChannelHandlerContext newCtx = + final AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, ctx.invoker, newName, newHandler); if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { @@ -520,12 +522,12 @@ final class DefaultChannelPipeline implements ChannelPipeline { return ctx.handler(); } - private void replace0(DefaultChannelHandlerContext oldCtx, String newName, - DefaultChannelHandlerContext newCtx) { + private void replace0(AbstractChannelHandlerContext oldCtx, String newName, + AbstractChannelHandlerContext newCtx) { checkMultiplicity(newCtx); - DefaultChannelHandlerContext prev = oldCtx.prev; - DefaultChannelHandlerContext next = oldCtx.next; + AbstractChannelHandlerContext prev = oldCtx.prev; + AbstractChannelHandlerContext next = oldCtx.next; newCtx.prev = prev; newCtx.next = next; @@ -565,8 +567,8 @@ final class DefaultChannelPipeline implements ChannelPipeline { } } - private void callHandlerAdded(final DefaultChannelHandlerContext ctx) { - if ((ctx.skipFlags & DefaultChannelHandlerContext.MASK_HANDLER_ADDED) != 0) { + private void callHandlerAdded(final AbstractChannelHandlerContext ctx) { + if ((ctx.skipFlags & AbstractChannelHandlerContext.MASK_HANDLER_ADDED) != 0) { return; } @@ -582,7 +584,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { callHandlerAdded0(ctx); } - private void callHandlerAdded0(final DefaultChannelHandlerContext ctx) { + private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { try { ctx.handler().handlerAdded(ctx); } catch (Throwable t) { @@ -608,8 +610,8 @@ final class DefaultChannelPipeline implements ChannelPipeline { } } - private void callHandlerRemoved(final DefaultChannelHandlerContext ctx) { - if ((ctx.skipFlags & DefaultChannelHandlerContext.MASK_HANDLER_REMOVED) != 0) { + private void callHandlerRemoved(final AbstractChannelHandlerContext ctx) { + if ((ctx.skipFlags & AbstractChannelHandlerContext.MASK_HANDLER_REMOVED) != 0) { return; } @@ -625,7 +627,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { callHandlerRemoved0(ctx); } - private void callHandlerRemoved0(final DefaultChannelHandlerContext ctx) { + private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) { // Notify the complete removal. try { ctx.handler().handlerRemoved(ctx); @@ -674,7 +676,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelHandlerContext firstContext() { - DefaultChannelHandlerContext first = head.next; + AbstractChannelHandlerContext first = head.next; if (first == tail) { return null; } @@ -683,7 +685,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelHandler last() { - DefaultChannelHandlerContext last = tail.prev; + AbstractChannelHandlerContext last = tail.prev; if (last == head) { return null; } @@ -692,7 +694,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelHandlerContext lastContext() { - DefaultChannelHandlerContext last = tail.prev; + AbstractChannelHandlerContext last = tail.prev; if (last == head) { return null; } @@ -737,7 +739,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { throw new NullPointerException("handler"); } - DefaultChannelHandlerContext ctx = head.next; + AbstractChannelHandlerContext ctx = head.next; for (;;) { if (ctx == null) { @@ -758,7 +760,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { throw new NullPointerException("handlerType"); } - DefaultChannelHandlerContext ctx = head.next; + AbstractChannelHandlerContext ctx = head.next; for (;;) { if (ctx == null) { return null; @@ -773,7 +775,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public List names() { List list = new ArrayList(); - DefaultChannelHandlerContext ctx = head.next; + AbstractChannelHandlerContext ctx = head.next; for (;;) { if (ctx == null) { return list; @@ -786,7 +788,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public Map toMap() { Map map = new LinkedHashMap(); - DefaultChannelHandlerContext ctx = head.next; + AbstractChannelHandlerContext ctx = head.next; for (;;) { if (ctx == tail) { return map; @@ -809,7 +811,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { StringBuilder buf = new StringBuilder(); buf.append(StringUtil.simpleClassName(this)); buf.append('{'); - DefaultChannelHandlerContext ctx = head.next; + AbstractChannelHandlerContext ctx = head.next; for (;;) { if (ctx == tail) { break; @@ -1006,8 +1008,8 @@ final class DefaultChannelPipeline implements ChannelPipeline { } } - private DefaultChannelHandlerContext getContextOrDie(String name) { - DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) context(name); + private AbstractChannelHandlerContext getContextOrDie(String name) { + AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name); if (ctx == null) { throw new NoSuchElementException(name); } else { @@ -1015,8 +1017,8 @@ final class DefaultChannelPipeline implements ChannelPipeline { } } - private DefaultChannelHandlerContext getContextOrDie(ChannelHandler handler) { - DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) context(handler); + private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) { + AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler); if (ctx == null) { throw new NoSuchElementException(handler.getClass().getName()); } else { @@ -1024,8 +1026,8 @@ final class DefaultChannelPipeline implements ChannelPipeline { } } - private DefaultChannelHandlerContext getContextOrDie(Class handlerType) { - DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) context(handlerType); + private AbstractChannelHandlerContext getContextOrDie(Class handlerType) { + AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handlerType); if (ctx == null) { throw new NoSuchElementException(handlerType.getName()); } else { @@ -1033,8 +1035,18 @@ final class DefaultChannelPipeline implements ChannelPipeline { } } - // A special catch-all handler that handles both bytes and messages. - static final class TailHandler extends ChannelHandlerAdapter { + static final class TailContext extends AbstractChannelHandlerContext implements ChannelHandler { + private static final int SKIP_FLAGS = skipFlags0(TailContext.class); + private static final String TAIL_NAME = generateName0(TailContext.class); + + TailContext(DefaultChannelPipeline pipeline) { + super(pipeline, null, TAIL_NAME, SKIP_FLAGS); + } + + @Override + public ChannelHandler handler() { + return this; + } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { } @@ -1074,14 +1086,80 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } + + @Skip + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { } + + @Skip + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { } + + @Skip + @Override + public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) + throws Exception { + ctx.bind(localAddress, promise); + } + + @Skip + @Override + public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, + SocketAddress localAddress, ChannelPromise promise) throws Exception { + ctx.connect(remoteAddress, localAddress, promise); + } + + @Skip + @Override + public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + ctx.disconnect(promise); + } + + @Skip + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + ctx.close(promise); + } + + @Skip + @Override + public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + ctx.deregister(promise); + } + + @Skip + @Override + public void read(ChannelHandlerContext ctx) throws Exception { + ctx.read(); + } + + @Skip + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + ctx.write(msg, promise); + } + + @Skip + @Override + public void flush(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } } - static final class HeadHandler extends ChannelHandlerAdapter { + static final class HeadContext extends AbstractChannelHandlerContext implements ChannelHandler { + private static final int SKIP_FLAGS = skipFlags0(HeadContext.class); + private static final String HEAD_NAME = generateName0(HeadContext.class); private final Unsafe unsafe; - HeadHandler(Unsafe unsafe) { - this.unsafe = unsafe; + HeadContext(DefaultChannelPipeline pipeline) { + super(pipeline, null, HEAD_NAME, SKIP_FLAGS); + unsafe = pipeline.channel().unsafe(); + } + + @Override + public ChannelHandler handler() { + return this; } @Override @@ -1128,5 +1206,67 @@ final class DefaultChannelPipeline implements ChannelPipeline { public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); } + + @Skip + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { } + + @Skip + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { } + + @Skip + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.fireExceptionCaught(cause); + } + + @Skip + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + ctx.fireChannelRegistered(); + } + + @Skip + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + ctx.fireChannelUnregistered(); + } + + @Skip + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + ctx.fireChannelActive(); + } + + @Skip + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + ctx.fireChannelInactive(); + } + + @Skip + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ctx.fireChannelRead(msg); + } + + @Skip + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.fireChannelReadComplete(); + } + + @Skip + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + ctx.fireUserEventTriggered(evt); + } + + @Skip + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + ctx.fireChannelWritabilityChanged(); + } } } diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index 9958fe20e9..785b84dec8 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -259,7 +259,7 @@ public class DefaultChannelPipelineTest { pipeline.addBefore("1", "0", newHandler()); pipeline.addAfter("10", "11", newHandler()); - DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) pipeline.firstContext(); + AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) pipeline.firstContext(); assertNotNull(ctx); while (ctx != null) { int i = toInt(ctx.name()); @@ -548,8 +548,8 @@ public class DefaultChannelPipelineTest { assertNull(pipeline.last()); } - private static int next(DefaultChannelHandlerContext ctx) { - DefaultChannelHandlerContext next = ctx.next; + private static int next(AbstractChannelHandlerContext ctx) { + AbstractChannelHandlerContext next = ctx.next; if (next == null) { return Integer.MAX_VALUE; } @@ -566,7 +566,7 @@ public class DefaultChannelPipelineTest { } private static void verifyContextNumber(ChannelPipeline pipeline, int expectedNumber) { - DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) pipeline.firstContext(); + AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) pipeline.firstContext(); int handlerNumber = 0; while (ctx != ((DefaultChannelPipeline) pipeline).tail) { handlerNumber++;