diff --git a/microbench/src/main/java/io/netty/microbench/channel/DefaultChannelPipelineBenchmark.java b/microbench/src/main/java/io/netty/microbench/channel/DefaultChannelPipelineBenchmark.java new file mode 100644 index 0000000000..2747ccfd98 --- /dev/null +++ b/microbench/src/main/java/io/netty/microbench/channel/DefaultChannelPipelineBenchmark.java @@ -0,0 +1,84 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.microbench.channel; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandler; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.microbench.util.AbstractMicrobenchmark; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@State(Scope.Benchmark) +public class DefaultChannelPipelineBenchmark extends AbstractMicrobenchmark { + + private static final ChannelInboundHandler NOOP_HANDLER = new ChannelInboundHandlerAdapter() { + @Override + public boolean isSharable() { + return true; + } + }; + + private static final ChannelInboundHandler CONSUMING_HANDLER = new ChannelInboundHandlerAdapter() { + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + // NOOP + } + + @Override + public boolean isSharable() { + return true; + } + }; + + @Param({ "4" }) + public int extraHandlers; + + private ChannelPipeline pipeline; + + @Setup(Level.Iteration) + public void setup() { + pipeline = new EmbeddedChannel().pipeline(); + for (int i = 0; i < extraHandlers; i++) { + pipeline.addLast(NOOP_HANDLER); + } + pipeline.addLast(CONSUMING_HANDLER); + } + + @TearDown + public void tearDown() { + pipeline.channel().close(); + } + + @Benchmark + public void propagateEvent(Blackhole hole) { + for (int i = 0; i < 100; i++) { + hole.consume(pipeline.fireChannelReadComplete()); + } + } +} diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java index 8019e97723..913edecc08 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java @@ -23,7 +23,9 @@ import io.netty.util.Recycler; import io.netty.util.ReferenceCountUtil; import io.netty.util.ResourceLeakHint; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.concurrent.OrderedEventExecutor; +import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PromiseNotificationUtil; import io.netty.util.internal.ThrowableUtil; import io.netty.util.internal.ObjectUtil; @@ -33,6 +35,10 @@ import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.net.SocketAddress; +import java.security.AccessController; +import java.security.PrivilegedExceptionAction; +import java.util.Map; +import java.util.WeakHashMap; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; abstract class AbstractChannelHandlerContext extends DefaultAttributeMap @@ -63,8 +69,42 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap */ private static final int INIT = 0; - private final boolean inbound; - private final boolean outbound; + // Using to mask which methods must be called for a ChannelHandler. + private static final int MASK_EXCEPTION_CAUGHT = 1; + private static final int MASK_CHANNEL_REGISTERED = 1 << 1; + private static final int MASK_CHANNEL_UNREGISTERED = 1 << 2; + private static final int MASK_CHANNEL_ACTIVE = 1 << 3; + private static final int MASK_CHANNEL_INACTIVE = 1 << 4; + private static final int MASK_CHANNEL_READ = 1 << 5; + private static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6; + private static final int MASK_USER_EVENT_TRIGGERED = 1 << 7; + private static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8; + private static final int MASK_BIND = 1 << 9; + private static final int MASK_CONNECT = 1 << 10; + private static final int MASK_DISCONNECT = 1 << 11; + private static final int MASK_CLOSE = 1 << 12; + private static final int MASK_REGISTER = 1 << 13; + private static final int MASK_DEREGISTER = 1 << 14; + private static final int MASK_READ = 1 << 15; + private static final int MASK_WRITE = 1 << 16; + private static final int MASK_FLUSH = 1 << 17; + + private static final int MASK_ALL_INBOUND = MASK_CHANNEL_REGISTERED | + MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ | + MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED; + private static final int MASK_ALL_OUTBOUND = MASK_BIND | MASK_CONNECT | MASK_DISCONNECT | + MASK_CLOSE | MASK_REGISTER | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH; + + private static final FastThreadLocal, Integer>> MASKS = + new FastThreadLocal, Integer>>() { + @Override + protected Map, Integer> initialValue() { + return new WeakHashMap, Integer>(32); + } + }; + + private final int executionMask; + private final DefaultChannelPipeline pipeline; private final String name; private final boolean ordered; @@ -84,16 +124,122 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap private volatile int handlerState = INIT; AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, - boolean inbound, boolean outbound) { + Class handlerClass) { this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; - this.inbound = inbound; - this.outbound = outbound; + this.executionMask = mask(handlerClass); // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor. ordered = executor == null || executor instanceof OrderedEventExecutor; } + /** + * Return the {@code executionMask}. + */ + private static int mask(Class clazz) { + // Try to obtain the mask from the cache first. If this fails calculate it and put it in the cache for fast + // lookup in the future. + Map, Integer> cache = MASKS.get(); + Integer mask = cache.get(clazz); + if (mask == null) { + mask = mask0(clazz); + cache.put(clazz, mask); + } + return mask; + } + + /** + * Calculate the {@code executionMask}. + */ + private static int mask0(Class handlerType) { + int mask = MASK_EXCEPTION_CAUGHT; + try { + if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) { + mask &= ~MASK_EXCEPTION_CAUGHT; + } + + if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) { + mask |= MASK_ALL_INBOUND; + + if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) { + mask &= ~MASK_CHANNEL_REGISTERED; + } + if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) { + mask &= ~MASK_CHANNEL_UNREGISTERED; + } + if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) { + mask &= ~MASK_CHANNEL_ACTIVE; + } + if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) { + mask &= ~MASK_CHANNEL_INACTIVE; + } + if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) { + mask &= ~MASK_CHANNEL_READ; + } + if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) { + mask &= ~MASK_CHANNEL_READ_COMPLETE; + } + if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) { + mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED; + } + if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) { + mask &= ~MASK_USER_EVENT_TRIGGERED; + } + } + if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) { + mask |= MASK_ALL_OUTBOUND; + + if (isSkippable(handlerType, "bind", ChannelHandlerContext.class, + SocketAddress.class, ChannelPromise.class)) { + mask &= ~MASK_BIND; + } + if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class, + SocketAddress.class, ChannelPromise.class)) { + mask &= ~MASK_CONNECT; + } + if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) { + mask &= ~MASK_DISCONNECT; + } + if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) { + mask &= ~MASK_CLOSE; + } + if (isSkippable(handlerType, "register", ChannelHandlerContext.class, ChannelPromise.class)) { + mask &= ~MASK_REGISTER; + } + if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) { + mask &= ~MASK_DEREGISTER; + } + if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) { + mask &= ~MASK_READ; + } + if (isSkippable(handlerType, "write", ChannelHandlerContext.class, + Object.class, ChannelPromise.class)) { + mask &= ~MASK_WRITE; + } + if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) { + mask &= ~MASK_FLUSH; + } + } + } catch (Exception e) { + // Should never reach here. + PlatformDependent.throwException(e); + } + + return mask; + } + + @SuppressWarnings("rawtypes") + private static boolean isSkippable( + final Class handlerType, final String methodName, final Class... paramTypes) throws Exception { + + return AccessController.doPrivileged(new PrivilegedExceptionAction() { + @Override + public Boolean run() throws Exception { + return handlerType.getMethod(methodName, paramTypes).isAnnotationPresent(ChannelHandler.Skip.class); + } + }); + } + @Override public Channel channel() { return pipeline.channel(); @@ -125,7 +271,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap @Override public ChannelHandlerContext fireChannelRegistered() { - invokeChannelRegistered(findContextInbound()); + invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED)); return this; } @@ -157,7 +303,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap @Override public ChannelHandlerContext fireChannelUnregistered() { - invokeChannelUnregistered(findContextInbound()); + invokeChannelUnregistered(findContextInbound(MASK_CHANNEL_UNREGISTERED)); return this; } @@ -189,7 +335,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap @Override public ChannelHandlerContext fireChannelActive() { - invokeChannelActive(findContextInbound()); + invokeChannelActive(findContextInbound(MASK_CHANNEL_ACTIVE)); return this; } @@ -221,7 +367,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap @Override public ChannelHandlerContext fireChannelInactive() { - invokeChannelInactive(findContextInbound()); + invokeChannelInactive(findContextInbound(MASK_CHANNEL_INACTIVE)); return this; } @@ -253,7 +399,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap @Override public ChannelHandlerContext fireExceptionCaught(final Throwable cause) { - invokeExceptionCaught(next, cause); + invokeExceptionCaught(findContextInbound(MASK_EXCEPTION_CAUGHT), cause); return this; } @@ -304,7 +450,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap @Override public ChannelHandlerContext fireUserEventTriggered(final Object event) { - invokeUserEventTriggered(findContextInbound(), event); + invokeUserEventTriggered(findContextInbound(MASK_USER_EVENT_TRIGGERED), event); return this; } @@ -337,7 +483,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap @Override public ChannelHandlerContext fireChannelRead(final Object msg) { - invokeChannelRead(findContextInbound(), msg); + invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg); return this; } @@ -370,7 +516,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap @Override public ChannelHandlerContext fireChannelReadComplete() { - invokeChannelReadComplete(findContextInbound()); + invokeChannelReadComplete(findContextInbound(MASK_CHANNEL_READ_COMPLETE)); return this; } @@ -406,7 +552,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap @Override public ChannelHandlerContext fireChannelWritabilityChanged() { - invokeChannelWritabilityChanged(findContextInbound()); + invokeChannelWritabilityChanged(findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED)); return this; } @@ -485,7 +631,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return promise; } - final AbstractChannelHandlerContext next = findContextOutbound(); + final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); @@ -529,7 +675,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return promise; } - final AbstractChannelHandlerContext next = findContextOutbound(); + final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeConnect(remoteAddress, localAddress, promise); @@ -563,7 +709,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return promise; } - final AbstractChannelHandlerContext next = findContextOutbound(); + final AbstractChannelHandlerContext next = findContextOutbound(MASK_DISCONNECT); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { // Translate disconnect to close if the channel has no notion of disconnect-reconnect. @@ -607,7 +753,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return promise; } - final AbstractChannelHandlerContext next = findContextOutbound(); + final AbstractChannelHandlerContext next = findContextOutbound(MASK_CLOSE); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeClose(promise); @@ -642,7 +788,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return promise; } - final AbstractChannelHandlerContext next = findContextOutbound(); + final AbstractChannelHandlerContext next = findContextOutbound(MASK_REGISTER); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeRegister(promise); @@ -677,7 +823,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return promise; } - final AbstractChannelHandlerContext next = findContextOutbound(); + final AbstractChannelHandlerContext next = findContextOutbound(MASK_DEREGISTER); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeDeregister(promise); @@ -707,7 +853,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap @Override public ChannelHandlerContext read() { - final AbstractChannelHandlerContext next = findContextOutbound(); + final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeRead(); @@ -783,7 +929,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap @Override public ChannelHandlerContext flush() { - final AbstractChannelHandlerContext next = findContextOutbound(); + final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeFlush(); @@ -846,7 +992,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } private void write(Object msg, boolean flush, ChannelPromise promise) { - AbstractChannelHandlerContext next = findContextOutbound(); + final AbstractChannelHandlerContext next = findContextOutbound(flush ? (MASK_WRITE | MASK_FLUSH) : MASK_WRITE); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { @@ -977,19 +1123,19 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return false; } - private AbstractChannelHandlerContext findContextInbound() { + private AbstractChannelHandlerContext findContextInbound(int mask) { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; - } while (!ctx.inbound); + } while ((ctx.executionMask & mask) == 0); return ctx; } - private AbstractChannelHandlerContext findContextOutbound() { + private AbstractChannelHandlerContext findContextOutbound(int mask) { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; - } while (!ctx.outbound); + } while ((ctx.executionMask & mask) == 0); return ctx; } diff --git a/transport/src/main/java/io/netty/channel/ChannelDuplexHandler.java b/transport/src/main/java/io/netty/channel/ChannelDuplexHandler.java index 0b3fc2d28b..bc3a915e65 100644 --- a/transport/src/main/java/io/netty/channel/ChannelDuplexHandler.java +++ b/transport/src/main/java/io/netty/channel/ChannelDuplexHandler.java @@ -32,6 +32,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { @@ -44,6 +45,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { @@ -56,6 +58,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { @@ -68,6 +71,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.close(promise); @@ -79,6 +83,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void register(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.register(promise); @@ -90,6 +95,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.deregister(promise); @@ -101,6 +107,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void read(ChannelHandlerContext ctx) throws Exception { ctx.read(); @@ -112,6 +119,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); @@ -123,6 +131,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void flush(ChannelHandlerContext ctx) throws Exception { ctx.flush(); diff --git a/transport/src/main/java/io/netty/channel/ChannelHandler.java b/transport/src/main/java/io/netty/channel/ChannelHandler.java index f9401080eb..aca5901a8b 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandler.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandler.java @@ -215,4 +215,30 @@ public interface ChannelHandler { @interface Sharable { // no value } + + /** + * Indicates that the annotated event handler method in {@link ChannelHandler} will not be invoked by + * {@link ChannelPipeline}. This annotation is only useful when your handler method implementation + * only passes the event through to the next handler, like the following: + * + *
+     * {@code @Skip}
+     * {@code @Override}
+     * public void channelActive({@link ChannelHandlerContext} ctx) {
+     *     ctx.fireChannelActive(); // do nothing but passing through to the next handler
+     * }
+     * 
+ * + *

+ * Note that this annotation is not {@linkplain Inherited inherited}. If you override a method annotated with + * {@link Skip}, it will not be skipped anymore. Similarly, you can override a method not annotated with + * {@link Skip} and simply pass the event through to the next handler, which reverses the behavior of the + * supertype. + *

+ */ + @Target(ElementType.METHOD) + @Retention(RetentionPolicy.RUNTIME) + @interface Skip { + // no value + } } diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java index aadc691d66..722261ed32 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java @@ -82,6 +82,7 @@ public abstract class ChannelHandlerAdapter implements ChannelHandler { * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java index d0a1d2d766..0acc18a4d3 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java @@ -37,6 +37,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelRegistered(); @@ -48,6 +49,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered(); @@ -59,6 +61,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); @@ -70,6 +73,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); @@ -81,6 +85,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); @@ -92,6 +97,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); @@ -103,6 +109,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ctx.fireUserEventTriggered(evt); @@ -114,6 +121,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelWritabilityChanged(); @@ -125,6 +133,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java index f4cf8a130d..674a6ca7d3 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java @@ -29,6 +29,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { @@ -41,6 +42,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { @@ -53,6 +55,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { @@ -65,6 +68,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { @@ -77,6 +81,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void register(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.register(promise); @@ -88,6 +93,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.deregister(promise); @@ -99,6 +105,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void read(ChannelHandlerContext ctx) throws Exception { ctx.read(); @@ -110,6 +117,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); @@ -121,6 +129,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void flush(ChannelHandlerContext ctx) throws Exception { ctx.flush(); diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index 58454b817a..3a3fe3412b 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -16,6 +16,7 @@ package io.netty.channel; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.internal.ObjectUtil; final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext { @@ -23,10 +24,7 @@ final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext { DefaultChannelHandlerContext( DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { - super(pipeline, executor, name, isInbound(handler), isOutbound(handler)); - if (handler == null) { - throw new NullPointerException("handler"); - } + super(pipeline, executor, name, ObjectUtil.checkNotNull(handler, "handler").getClass()); this.handler = handler; } @@ -34,12 +32,4 @@ final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext { public ChannelHandler handler() { return handler; } - - private static boolean isInbound(ChannelHandler handler) { - return handler instanceof ChannelInboundHandler; - } - - private static boolean isOutbound(ChannelHandler handler) { - return handler instanceof ChannelOutboundHandler; - } } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 923e821722..492842299b 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -36,7 +36,6 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.WeakHashMap; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; /** @@ -1149,7 +1148,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { TailContext(DefaultChannelPipeline pipeline) { - super(pipeline, null, TAIL_NAME, true, false); + super(pipeline, null, TAIL_NAME, TailContext.class); setAddComplete(); } @@ -1212,7 +1211,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { - super(pipeline, null, HEAD_NAME, true, true); + super(pipeline, null, HEAD_NAME, HeadContext.class); unsafe = pipeline.channel().unsafe(); setAddComplete(); } @@ -1282,11 +1281,13 @@ public class DefaultChannelPipeline implements ChannelPipeline { unsafe.flush(); } + @Skip @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } + @Skip @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelRegistered(); @@ -1302,31 +1303,37 @@ public class DefaultChannelPipeline implements ChannelPipeline { } } + @Skip @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); } + @Skip @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); } + @Skip @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } + @Skip @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); } + @Skip @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ctx.fireUserEventTriggered(evt); } + @Skip @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelWritabilityChanged(); diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index 970f160ced..c793f5a2f1 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -43,6 +43,7 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Test; +import java.net.SocketAddress; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; @@ -1197,6 +1198,388 @@ public class DefaultChannelPipelineTest { } } + @Test + public void testSkipHandlerMethodsIfAnnotated() { + EmbeddedChannel channel = new EmbeddedChannel(true); + ChannelPipeline pipeline = channel.pipeline(); + + final class SkipHandler implements ChannelInboundHandler, ChannelOutboundHandler { + private int state = 2; + private Error errorRef; + + private void fail() { + errorRef = new AssertionError("Method should never been called"); + } + + @Skip + @Override + public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) { + fail(); + ctx.bind(localAddress, promise); + } + + @Skip + @Override + public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, + SocketAddress localAddress, ChannelPromise promise) { + fail(); + ctx.connect(remoteAddress, localAddress, promise); + } + + @Skip + @Override + public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) { + fail(); + ctx.disconnect(promise); + } + + @Skip + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) { + fail(); + ctx.close(promise); + } + + @Skip + @Override + public void register(ChannelHandlerContext ctx, ChannelPromise promise) { + fail(); + ctx.register(promise); + } + + @Skip + @Override + public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) { + fail(); + ctx.deregister(promise); + } + + @Skip + @Override + public void read(ChannelHandlerContext ctx) { + fail(); + ctx.read(); + } + + @Skip + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + fail(); + ctx.write(msg, promise); + } + + @Skip + @Override + public void flush(ChannelHandlerContext ctx) { + fail(); + ctx.flush(); + } + + @Skip + @Override + public void channelRegistered(ChannelHandlerContext ctx) { + fail(); + ctx.fireChannelRegistered(); + } + + @Skip + @Override + public void channelUnregistered(ChannelHandlerContext ctx) { + fail(); + ctx.fireChannelUnregistered(); + } + + @Skip + @Override + public void channelActive(ChannelHandlerContext ctx) { + fail(); + ctx.fireChannelActive(); + } + + @Skip + @Override + public void channelInactive(ChannelHandlerContext ctx) { + fail(); + ctx.fireChannelInactive(); + } + + @Skip + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + fail(); + ctx.fireChannelRead(msg); + } + + @Skip + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + fail(); + ctx.fireChannelReadComplete(); + } + + @Skip + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { + fail(); + ctx.fireUserEventTriggered(evt); + } + + @Skip + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) { + fail(); + ctx.fireChannelWritabilityChanged(); + } + + @Skip + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + fail(); + ctx.fireExceptionCaught(cause); + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) { + state--; + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) { + state--; + } + + void assertSkipped() { + assertEquals(0, state); + Error error = errorRef; + if (error != null) { + throw error; + } + } + } + + final class OutboundCalledHandler extends ChannelOutboundHandlerAdapter { + private static final int MASK_BIND = 1; + private static final int MASK_CONNECT = 1 << 1; + private static final int MASK_DISCONNECT = 1 << 2; + private static final int MASK_CLOSE = 1 << 3; + private static final int MASK_REGISTER = 1 << 4; + private static final int MASK_DEREGISTER = 1 << 5; + private static final int MASK_READ = 1 << 6; + private static final int MASK_WRITE = 1 << 7; + private static final int MASK_FLUSH = 1 << 8; + private static final int MASK_ADDED = 1 << 9; + private static final int MASK_REMOVED = 1 << 10; + + private int executionMask; + + @Override + public void handlerAdded(ChannelHandlerContext ctx) { + executionMask |= MASK_ADDED; + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) { + executionMask |= MASK_REMOVED; + } + + @Override + public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) { + executionMask |= MASK_BIND; + promise.setSuccess(); + } + + @Override + public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, + SocketAddress localAddress, ChannelPromise promise) { + executionMask |= MASK_CONNECT; + promise.setSuccess(); + } + + @Override + public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) { + executionMask |= MASK_DISCONNECT; + promise.setSuccess(); + } + + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) { + executionMask |= MASK_CLOSE; + promise.setSuccess(); + } + + @Override + public void register(ChannelHandlerContext ctx, ChannelPromise promise) { + executionMask |= MASK_REGISTER; + promise.setSuccess(); + } + + @Override + public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) { + executionMask |= MASK_DEREGISTER; + promise.setSuccess(); + } + + @Override + public void read(ChannelHandlerContext ctx) { + executionMask |= MASK_READ; + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + executionMask |= MASK_WRITE; + promise.setSuccess(); + } + + @Override + public void flush(ChannelHandlerContext ctx) { + executionMask |= MASK_FLUSH; + } + + void assertCalled() { + assertCalled("handlerAdded", MASK_ADDED); + assertCalled("handlerRemoved", MASK_REMOVED); + assertCalled("bind", MASK_BIND); + assertCalled("connect", MASK_CONNECT); + assertCalled("disconnect", MASK_DISCONNECT); + assertCalled("close", MASK_CLOSE); + assertCalled("register", MASK_REGISTER); + assertCalled("deregister", MASK_DEREGISTER); + assertCalled("read", MASK_READ); + assertCalled("write", MASK_WRITE); + assertCalled("flush", MASK_FLUSH); + } + + private void assertCalled(String methodName, int mask) { + assertTrue(methodName + " was not called", (executionMask & mask) != 0); + } + } + + final class InboundCalledHandler extends ChannelInboundHandlerAdapter { + + private static final int MASK_CHANNEL_REGISTER = 1; + private static final int MASK_CHANNEL_UNREGISTER = 1 << 1; + private static final int MASK_CHANNEL_ACTIVE = 1 << 2; + private static final int MASK_CHANNEL_INACTIVE = 1 << 3; + private static final int MASK_CHANNEL_READ = 1 << 4; + private static final int MASK_CHANNEL_READ_COMPLETE = 1 << 5; + private static final int MASK_USER_EVENT_TRIGGERED = 1 << 6; + private static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 7; + private static final int MASK_EXCEPTION_CAUGHT = 1 << 8; + private static final int MASK_ADDED = 1 << 9; + private static final int MASK_REMOVED = 1 << 10; + + private int executionMask; + + @Override + public void handlerAdded(ChannelHandlerContext ctx) { + executionMask |= MASK_ADDED; + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) { + executionMask |= MASK_REMOVED; + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) { + executionMask |= MASK_CHANNEL_REGISTER; + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) { + executionMask |= MASK_CHANNEL_UNREGISTER; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + executionMask |= MASK_CHANNEL_ACTIVE; + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + executionMask |= MASK_CHANNEL_INACTIVE; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + executionMask |= MASK_CHANNEL_READ; + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + executionMask |= MASK_CHANNEL_READ_COMPLETE; + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { + executionMask |= MASK_USER_EVENT_TRIGGERED; + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) { + executionMask |= MASK_CHANNEL_WRITABILITY_CHANGED; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + executionMask |= MASK_EXCEPTION_CAUGHT; + } + + void assertCalled() { + assertCalled("handlerAdded", MASK_ADDED); + assertCalled("handlerRemoved", MASK_REMOVED); + assertCalled("channelRegistered", MASK_CHANNEL_REGISTER); + assertCalled("channelUnregistered", MASK_CHANNEL_UNREGISTER); + assertCalled("channelActive", MASK_CHANNEL_ACTIVE); + assertCalled("channelInactive", MASK_CHANNEL_INACTIVE); + assertCalled("channelRead", MASK_CHANNEL_READ); + assertCalled("channelReadComplete", MASK_CHANNEL_READ_COMPLETE); + assertCalled("userEventTriggered", MASK_USER_EVENT_TRIGGERED); + assertCalled("channelWritabilityChanged", MASK_CHANNEL_WRITABILITY_CHANGED); + assertCalled("exceptionCaught", MASK_EXCEPTION_CAUGHT); + } + + private void assertCalled(String methodName, int mask) { + assertTrue(methodName + " was not called", (executionMask & mask) != 0); + } + } + + OutboundCalledHandler outboundCalledHandler = new OutboundCalledHandler(); + SkipHandler skipHandler = new SkipHandler(); + InboundCalledHandler inboundCalledHandler = new InboundCalledHandler(); + pipeline.addLast(outboundCalledHandler, skipHandler, inboundCalledHandler); + + pipeline.fireChannelRegistered(); + pipeline.fireChannelUnregistered(); + pipeline.fireChannelActive(); + pipeline.fireChannelInactive(); + pipeline.fireChannelRead(""); + pipeline.fireChannelReadComplete(); + pipeline.fireChannelWritabilityChanged(); + pipeline.fireUserEventTriggered(""); + pipeline.fireExceptionCaught(new Exception()); + + pipeline.register().syncUninterruptibly(); + pipeline.deregister().syncUninterruptibly(); + pipeline.bind(new SocketAddress() { }).syncUninterruptibly(); + pipeline.connect(new SocketAddress() { }).syncUninterruptibly(); + pipeline.disconnect().syncUninterruptibly(); + pipeline.close().syncUninterruptibly(); + pipeline.write(""); + pipeline.flush(); + pipeline.read(); + + pipeline.remove(outboundCalledHandler); + pipeline.remove(inboundCalledHandler); + pipeline.remove(skipHandler); + + assertFalse(channel.finish()); + + outboundCalledHandler.assertCalled(); + inboundCalledHandler.assertCalled(); + skipHandler.assertSkipped(); + } + @Test(timeout = 5000) public void handlerAddedStateUpdatedBeforeHandlerAddedDoneForceEventLoop() throws InterruptedException { handlerAddedStateUpdatedBeforeHandlerAddedDone(true);