diff --git a/microbench/src/main/java/io/netty/microbench/channel/DefaultChannelPipelineBenchmark.java b/microbench/src/main/java/io/netty/microbench/channel/DefaultChannelPipelineBenchmark.java new file mode 100644 index 0000000000..c8a24091a9 --- /dev/null +++ b/microbench/src/main/java/io/netty/microbench/channel/DefaultChannelPipelineBenchmark.java @@ -0,0 +1,84 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.microbench.channel; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.microbench.util.AbstractMicrobenchmark; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@State(Scope.Benchmark) +public class DefaultChannelPipelineBenchmark extends AbstractMicrobenchmark { + + private static final ChannelHandler NOOP_HANDLER = new ChannelInboundHandlerAdapter() { + @Override + public boolean isSharable() { + return true; + } + }; + + private static final ChannelHandler CONSUMING_HANDLER = new ChannelInboundHandlerAdapter() { + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + // NOOP + } + + @Override + public boolean isSharable() { + return true; + } + }; + + @Param({ "4" }) + public int extraHandlers; + + private ChannelPipeline pipeline; + + @Setup(Level.Iteration) + public void setup() { + pipeline = new EmbeddedChannel().pipeline(); + for (int i = 0; i < extraHandlers; i++) { + pipeline.addLast(NOOP_HANDLER); + } + pipeline.addLast(CONSUMING_HANDLER); + } + + @TearDown + public void tearDown() { + pipeline.channel().close(); + } + + @Benchmark + public void propagateEvent(Blackhole hole) { + for (int i = 0; i < 100; i++) { + hole.consume(pipeline.fireChannelReadComplete()); + } + } +} diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java index e3abe49dbe..7c68b50704 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java @@ -34,6 +34,25 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import java.net.SocketAddress; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import static io.netty.channel.ChannelHandlerMask.MASK_BIND; +import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_ACTIVE; +import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_INACTIVE; +import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ; +import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ_COMPLETE; +import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_REGISTERED; +import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_UNREGISTERED; +import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_WRITABILITY_CHANGED; +import static io.netty.channel.ChannelHandlerMask.MASK_CLOSE; +import static io.netty.channel.ChannelHandlerMask.MASK_CONNECT; +import static io.netty.channel.ChannelHandlerMask.MASK_DEREGISTER; +import static io.netty.channel.ChannelHandlerMask.MASK_DISCONNECT; +import static io.netty.channel.ChannelHandlerMask.MASK_EXCEPTION_CAUGHT; +import static io.netty.channel.ChannelHandlerMask.MASK_FLUSH; +import static io.netty.channel.ChannelHandlerMask.MASK_READ; +import static io.netty.channel.ChannelHandlerMask.MASK_USER_EVENT_TRIGGERED; +import static io.netty.channel.ChannelHandlerMask.MASK_WRITE; +import static io.netty.channel.ChannelHandlerMask.mask; + abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint { private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class); @@ -61,11 +80,10 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R */ private static final int INIT = 0; - private final boolean inbound; - private final boolean outbound; private final DefaultChannelPipeline pipeline; private final String name; private final boolean ordered; + private final int executionMask; // Will be set to null if no child executor should be used, otherwise it will be set to the // child executor. @@ -78,13 +96,12 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R private volatile int handlerState = INIT; - AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, - boolean inbound, boolean outbound) { + AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, + String name, Class handlerClass) { this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; - this.inbound = inbound; - this.outbound = outbound; + this.executionMask = mask(handlerClass); // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor. ordered = executor == null || executor instanceof OrderedEventExecutor; } @@ -120,7 +137,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R @Override public ChannelHandlerContext fireChannelRegistered() { - invokeChannelRegistered(findContextInbound()); + invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED)); return this; } @@ -152,7 +169,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R @Override public ChannelHandlerContext fireChannelUnregistered() { - invokeChannelUnregistered(findContextInbound()); + invokeChannelUnregistered(findContextInbound(MASK_CHANNEL_UNREGISTERED)); return this; } @@ -184,7 +201,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R @Override public ChannelHandlerContext fireChannelActive() { - invokeChannelActive(findContextInbound()); + invokeChannelActive(findContextInbound(MASK_CHANNEL_ACTIVE)); return this; } @@ -216,7 +233,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R @Override public ChannelHandlerContext fireChannelInactive() { - invokeChannelInactive(findContextInbound()); + invokeChannelInactive(findContextInbound(MASK_CHANNEL_INACTIVE)); return this; } @@ -248,7 +265,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R @Override public ChannelHandlerContext fireExceptionCaught(final Throwable cause) { - invokeExceptionCaught(next, cause); + invokeExceptionCaught(findContextInbound(MASK_EXCEPTION_CAUGHT), cause); return this; } @@ -299,7 +316,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R @Override public ChannelHandlerContext fireUserEventTriggered(final Object event) { - invokeUserEventTriggered(findContextInbound(), event); + invokeUserEventTriggered(findContextInbound(MASK_USER_EVENT_TRIGGERED), event); return this; } @@ -332,7 +349,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R @Override public ChannelHandlerContext fireChannelRead(final Object msg) { - invokeChannelRead(findContextInbound(), msg); + invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg); return this; } @@ -365,7 +382,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R @Override public ChannelHandlerContext fireChannelReadComplete() { - invokeChannelReadComplete(findContextInbound()); + invokeChannelReadComplete(findContextInbound(MASK_CHANNEL_READ_COMPLETE)); return this; } @@ -396,7 +413,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R @Override public ChannelHandlerContext fireChannelWritabilityChanged() { - invokeChannelWritabilityChanged(findContextInbound()); + invokeChannelWritabilityChanged(findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED)); return this; } @@ -465,7 +482,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R return promise; } - final AbstractChannelHandlerContext next = findContextOutbound(); + final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); @@ -509,7 +526,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R return promise; } - final AbstractChannelHandlerContext next = findContextOutbound(); + final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeConnect(remoteAddress, localAddress, promise); @@ -543,7 +560,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R return promise; } - final AbstractChannelHandlerContext next = findContextOutbound(); + final AbstractChannelHandlerContext next = findContextOutbound(MASK_DISCONNECT); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { // Translate disconnect to close if the channel has no notion of disconnect-reconnect. @@ -587,7 +604,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R return promise; } - final AbstractChannelHandlerContext next = findContextOutbound(); + final AbstractChannelHandlerContext next = findContextOutbound(MASK_CLOSE); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeClose(promise); @@ -622,7 +639,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R return promise; } - final AbstractChannelHandlerContext next = findContextOutbound(); + final AbstractChannelHandlerContext next = findContextOutbound(MASK_DEREGISTER); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeDeregister(promise); @@ -652,7 +669,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R @Override public ChannelHandlerContext read() { - final AbstractChannelHandlerContext next = findContextOutbound(); + final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeRead(); @@ -709,7 +726,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R @Override public ChannelHandlerContext flush() { - final AbstractChannelHandlerContext next = findContextOutbound(); + final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeFlush(); @@ -768,7 +785,8 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R throw e; } - AbstractChannelHandlerContext next = findContextOutbound(); + final AbstractChannelHandlerContext next = findContextOutbound(flush ? + (MASK_WRITE | MASK_FLUSH) : MASK_WRITE); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { @@ -899,19 +917,19 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R return false; } - private AbstractChannelHandlerContext findContextInbound() { + private AbstractChannelHandlerContext findContextInbound(int mask) { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; - } while (!ctx.inbound); + } while ((ctx.executionMask & mask) == 0); return ctx; } - private AbstractChannelHandlerContext findContextOutbound() { + private AbstractChannelHandlerContext findContextOutbound(int mask) { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; - } while (!ctx.outbound); + } while ((ctx.executionMask & mask) == 0); return ctx; } diff --git a/transport/src/main/java/io/netty/channel/ChannelDuplexHandler.java b/transport/src/main/java/io/netty/channel/ChannelDuplexHandler.java index 07c6484e50..eac4645f12 100644 --- a/transport/src/main/java/io/netty/channel/ChannelDuplexHandler.java +++ b/transport/src/main/java/io/netty/channel/ChannelDuplexHandler.java @@ -15,6 +15,8 @@ */ package io.netty.channel; +import io.netty.channel.ChannelHandlerMask.Skip; + import java.net.SocketAddress; /** @@ -32,6 +34,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { @@ -44,6 +47,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { @@ -56,6 +60,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { @@ -68,6 +73,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.close(promise); @@ -79,6 +85,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.deregister(promise); @@ -90,6 +97,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void read(ChannelHandlerContext ctx) throws Exception { ctx.read(); @@ -101,6 +109,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); @@ -112,6 +121,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void flush(ChannelHandlerContext ctx) throws Exception { ctx.flush(); diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java index ee380b5848..2041ebd12b 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java @@ -16,6 +16,7 @@ package io.netty.channel; +import io.netty.channel.ChannelHandlerMask.Skip; import io.netty.util.internal.InternalThreadLocalMap; import java.util.Map; @@ -84,6 +85,7 @@ public abstract class ChannelHandlerAdapter implements ChannelHandler { * * @deprecated is part of {@link ChannelInboundHandler} */ + @Skip @Override @Deprecated public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerMask.java b/transport/src/main/java/io/netty/channel/ChannelHandlerMask.java new file mode 100644 index 0000000000..35e8aa2635 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerMask.java @@ -0,0 +1,189 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + +import io.netty.util.concurrent.FastThreadLocal; +import io.netty.util.internal.PlatformDependent; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.net.SocketAddress; +import java.security.AccessController; +import java.security.PrivilegedExceptionAction; +import java.util.Map; +import java.util.WeakHashMap; + +final class ChannelHandlerMask { + + // Using to mask which methods must be called for a ChannelHandler. + static final int MASK_EXCEPTION_CAUGHT = 1; + static final int MASK_CHANNEL_REGISTERED = 1 << 1; + static final int MASK_CHANNEL_UNREGISTERED = 1 << 2; + static final int MASK_CHANNEL_ACTIVE = 1 << 3; + static final int MASK_CHANNEL_INACTIVE = 1 << 4; + static final int MASK_CHANNEL_READ = 1 << 5; + static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6; + static final int MASK_USER_EVENT_TRIGGERED = 1 << 7; + static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8; + static final int MASK_BIND = 1 << 9; + static final int MASK_CONNECT = 1 << 10; + static final int MASK_DISCONNECT = 1 << 11; + static final int MASK_CLOSE = 1 << 12; + static final int MASK_DEREGISTER = 1 << 13; + static final int MASK_READ = 1 << 14; + static final int MASK_WRITE = 1 << 15; + static final int MASK_FLUSH = 1 << 16; + + private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_CHANNEL_REGISTERED | + MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ | + MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED; + private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_BIND | MASK_CONNECT | MASK_DISCONNECT | + MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH; + + private static final FastThreadLocal, Integer>> MASKS = + new FastThreadLocal, Integer>>() { + @Override + protected Map, Integer> initialValue() { + return new WeakHashMap, Integer>(32); + } + }; + + /** + * Return the {@code executionMask}. + */ + static int mask(Class clazz) { + // Try to obtain the mask from the cache first. If this fails calculate it and put it in the cache for fast + // lookup in the future. + Map, Integer> cache = MASKS.get(); + Integer mask = cache.get(clazz); + if (mask == null) { + mask = mask0(clazz); + cache.put(clazz, mask); + } + return mask; + } + + /** + * Calculate the {@code executionMask}. + */ + private static int mask0(Class handlerType) { + int mask = MASK_EXCEPTION_CAUGHT; + try { + if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) { + mask |= MASK_ALL_INBOUND; + + if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) { + mask &= ~MASK_CHANNEL_REGISTERED; + } + if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) { + mask &= ~MASK_CHANNEL_UNREGISTERED; + } + if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) { + mask &= ~MASK_CHANNEL_ACTIVE; + } + if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) { + mask &= ~MASK_CHANNEL_INACTIVE; + } + if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) { + mask &= ~MASK_CHANNEL_READ; + } + if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) { + mask &= ~MASK_CHANNEL_READ_COMPLETE; + } + if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) { + mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED; + } + if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) { + mask &= ~MASK_USER_EVENT_TRIGGERED; + } + } + + if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) { + mask |= MASK_ALL_OUTBOUND; + + if (isSkippable(handlerType, "bind", ChannelHandlerContext.class, + SocketAddress.class, ChannelPromise.class)) { + mask &= ~MASK_BIND; + } + if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class, + SocketAddress.class, ChannelPromise.class)) { + mask &= ~MASK_CONNECT; + } + if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) { + mask &= ~MASK_DISCONNECT; + } + if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) { + mask &= ~MASK_CLOSE; + } + if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) { + mask &= ~MASK_DEREGISTER; + } + if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) { + mask &= ~MASK_READ; + } + if (isSkippable(handlerType, "write", ChannelHandlerContext.class, + Object.class, ChannelPromise.class)) { + mask &= ~MASK_WRITE; + } + if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) { + mask &= ~MASK_FLUSH; + } + } + + if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) { + mask &= ~MASK_EXCEPTION_CAUGHT; + } + } catch (Exception e) { + // Should never reach here. + PlatformDependent.throwException(e); + } + + return mask; + } + + @SuppressWarnings("rawtypes") + private static boolean isSkippable( + final Class handlerType, final String methodName, final Class... paramTypes) throws Exception { + return AccessController.doPrivileged(new PrivilegedExceptionAction() { + @Override + public Boolean run() throws Exception { + return handlerType.getMethod(methodName, paramTypes).isAnnotationPresent(Skip.class); + } + }); + } + + private ChannelHandlerMask() { } + + /** + * Indicates that the annotated event handler method in {@link ChannelHandler} will not be invoked by + * {@link ChannelPipeline} and so MUST only be used when the {@link ChannelHandler} + * method does nothing except forward to the next {@link ChannelHandler} in the pipeline. + *

+ * Note that this annotation is not {@linkplain Inherited inherited}. If a user overrides a method annotated with + * {@link Skip}, it will not be skipped anymore. Similarly, the user can override a method not annotated with + * {@link Skip} and simply pass the event through to the next handler, which reverses the behavior of the + * supertype. + *

+ */ + @Target(ElementType.METHOD) + @Retention(RetentionPolicy.RUNTIME) + @interface Skip { + // no value + } +} diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java index c451329794..f9a88842af 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java @@ -15,6 +15,8 @@ */ package io.netty.channel; +import io.netty.channel.ChannelHandlerMask.Skip; + /** * Abstract base class for {@link ChannelInboundHandler} implementations which provide * implementations of all of their methods. @@ -37,6 +39,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelRegistered(); @@ -48,6 +51,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered(); @@ -59,6 +63,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); @@ -70,6 +75,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); @@ -81,6 +87,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); @@ -92,6 +99,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); @@ -103,6 +111,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ctx.fireUserEventTriggered(evt); @@ -114,6 +123,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelWritabilityChanged(); @@ -125,6 +135,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen * * Sub-classes may override this method to change behavior. */ + @Skip @Override @SuppressWarnings("deprecation") public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java index fa968925de..c68bfdbf8c 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java @@ -15,6 +15,8 @@ */ package io.netty.channel; +import io.netty.channel.ChannelHandlerMask.Skip; + import java.net.SocketAddress; /** @@ -29,6 +31,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { @@ -41,6 +44,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { @@ -53,6 +57,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { @@ -65,6 +70,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { @@ -77,6 +83,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.deregister(promise); @@ -88,6 +95,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void read(ChannelHandlerContext ctx) throws Exception { ctx.read(); @@ -99,6 +107,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); @@ -110,6 +119,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme * * Sub-classes may override this method to change behavior. */ + @Skip @Override public void flush(ChannelHandlerContext ctx) throws Exception { ctx.flush(); diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index 58454b817a..26f11d59a1 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -23,10 +23,7 @@ final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext { DefaultChannelHandlerContext( DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { - super(pipeline, executor, name, isInbound(handler), isOutbound(handler)); - if (handler == null) { - throw new NullPointerException("handler"); - } + super(pipeline, executor, name, handler.getClass()); this.handler = handler; } @@ -34,12 +31,4 @@ final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext { public ChannelHandler handler() { return handler; } - - private static boolean isInbound(ChannelHandler handler) { - return handler instanceof ChannelInboundHandler; - } - - private static boolean isOutbound(ChannelHandler handler) { - return handler instanceof ChannelOutboundHandler; - } } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 2b307cc911..9017f14e6c 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -1243,7 +1243,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { TailContext(DefaultChannelPipeline pipeline) { - super(pipeline, null, TAIL_NAME, true, false); + super(pipeline, null, TAIL_NAME, TailContext.class); setAddComplete(); } @@ -1306,7 +1306,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { - super(pipeline, null, HEAD_NAME, true, true); + super(pipeline, null, HEAD_NAME, HeadContext.class); unsafe = pipeline.channel().unsafe(); setAddComplete(); } diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index 65209d50cf..e2628e0f86 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -21,10 +21,10 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerMask.Skip; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; -import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.local.LocalServerChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.oio.OioEventLoopGroup; @@ -45,6 +45,7 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Test; +import java.net.SocketAddress; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; @@ -1223,6 +1224,374 @@ public class DefaultChannelPipelineTest { } } + @Test + public void testSkipHandlerMethodsIfAnnotated() { + EmbeddedChannel channel = new EmbeddedChannel(true); + ChannelPipeline pipeline = channel.pipeline(); + + final class SkipHandler implements ChannelInboundHandler, ChannelOutboundHandler { + private int state = 2; + private Error errorRef; + + private void fail() { + errorRef = new AssertionError("Method should never been called"); + } + + @Skip + @Override + public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) { + fail(); + ctx.bind(localAddress, promise); + } + + @Skip + @Override + public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, + SocketAddress localAddress, ChannelPromise promise) { + fail(); + ctx.connect(remoteAddress, localAddress, promise); + } + + @Skip + @Override + public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) { + fail(); + ctx.disconnect(promise); + } + + @Skip + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) { + fail(); + ctx.close(promise); + } + + @Skip + @Override + public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) { + fail(); + ctx.deregister(promise); + } + + @Skip + @Override + public void read(ChannelHandlerContext ctx) { + fail(); + ctx.read(); + } + + @Skip + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + fail(); + ctx.write(msg, promise); + } + + @Skip + @Override + public void flush(ChannelHandlerContext ctx) { + fail(); + ctx.flush(); + } + + @Skip + @Override + public void channelRegistered(ChannelHandlerContext ctx) { + fail(); + ctx.fireChannelRegistered(); + } + + @Skip + @Override + public void channelUnregistered(ChannelHandlerContext ctx) { + fail(); + ctx.fireChannelUnregistered(); + } + + @Skip + @Override + public void channelActive(ChannelHandlerContext ctx) { + fail(); + ctx.fireChannelActive(); + } + + @Skip + @Override + public void channelInactive(ChannelHandlerContext ctx) { + fail(); + ctx.fireChannelInactive(); + } + + @Skip + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + fail(); + ctx.fireChannelRead(msg); + } + + @Skip + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + fail(); + ctx.fireChannelReadComplete(); + } + + @Skip + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { + fail(); + ctx.fireUserEventTriggered(evt); + } + + @Skip + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) { + fail(); + ctx.fireChannelWritabilityChanged(); + } + + @Skip + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + fail(); + ctx.fireExceptionCaught(cause); + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) { + state--; + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) { + state--; + } + + void assertSkipped() { + assertEquals(0, state); + Error error = errorRef; + if (error != null) { + throw error; + } + } + } + + final class OutboundCalledHandler extends ChannelOutboundHandlerAdapter { + private static final int MASK_BIND = 1; + private static final int MASK_CONNECT = 1 << 1; + private static final int MASK_DISCONNECT = 1 << 2; + private static final int MASK_CLOSE = 1 << 3; + private static final int MASK_DEREGISTER = 1 << 4; + private static final int MASK_READ = 1 << 5; + private static final int MASK_WRITE = 1 << 6; + private static final int MASK_FLUSH = 1 << 7; + private static final int MASK_ADDED = 1 << 8; + private static final int MASK_REMOVED = 1 << 9; + + private int executionMask; + + @Override + public void handlerAdded(ChannelHandlerContext ctx) { + executionMask |= MASK_ADDED; + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) { + executionMask |= MASK_REMOVED; + } + + @Override + public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) { + executionMask |= MASK_BIND; + promise.setSuccess(); + } + + @Override + public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, + SocketAddress localAddress, ChannelPromise promise) { + executionMask |= MASK_CONNECT; + promise.setSuccess(); + } + + @Override + public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) { + executionMask |= MASK_DISCONNECT; + promise.setSuccess(); + } + + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) { + executionMask |= MASK_CLOSE; + promise.setSuccess(); + } + + @Override + public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) { + executionMask |= MASK_DEREGISTER; + promise.setSuccess(); + } + + @Override + public void read(ChannelHandlerContext ctx) { + executionMask |= MASK_READ; + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + executionMask |= MASK_WRITE; + promise.setSuccess(); + } + + @Override + public void flush(ChannelHandlerContext ctx) { + executionMask |= MASK_FLUSH; + } + + void assertCalled() { + assertCalled("handlerAdded", MASK_ADDED); + assertCalled("handlerRemoved", MASK_REMOVED); + assertCalled("bind", MASK_BIND); + assertCalled("connect", MASK_CONNECT); + assertCalled("disconnect", MASK_DISCONNECT); + assertCalled("close", MASK_CLOSE); + assertCalled("deregister", MASK_DEREGISTER); + assertCalled("read", MASK_READ); + assertCalled("write", MASK_WRITE); + assertCalled("flush", MASK_FLUSH); + } + + private void assertCalled(String methodName, int mask) { + assertTrue(methodName + " was not called", (executionMask & mask) != 0); + } + } + + final class InboundCalledHandler extends ChannelInboundHandlerAdapter { + + private static final int MASK_CHANNEL_REGISTER = 1; + private static final int MASK_CHANNEL_UNREGISTER = 1 << 1; + private static final int MASK_CHANNEL_ACTIVE = 1 << 2; + private static final int MASK_CHANNEL_INACTIVE = 1 << 3; + private static final int MASK_CHANNEL_READ = 1 << 4; + private static final int MASK_CHANNEL_READ_COMPLETE = 1 << 5; + private static final int MASK_USER_EVENT_TRIGGERED = 1 << 6; + private static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 7; + private static final int MASK_EXCEPTION_CAUGHT = 1 << 8; + private static final int MASK_ADDED = 1 << 9; + private static final int MASK_REMOVED = 1 << 10; + + private int executionMask; + + @Override + public void handlerAdded(ChannelHandlerContext ctx) { + executionMask |= MASK_ADDED; + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) { + executionMask |= MASK_REMOVED; + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) { + executionMask |= MASK_CHANNEL_REGISTER; + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) { + executionMask |= MASK_CHANNEL_UNREGISTER; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + executionMask |= MASK_CHANNEL_ACTIVE; + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + executionMask |= MASK_CHANNEL_INACTIVE; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + executionMask |= MASK_CHANNEL_READ; + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + executionMask |= MASK_CHANNEL_READ_COMPLETE; + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { + executionMask |= MASK_USER_EVENT_TRIGGERED; + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) { + executionMask |= MASK_CHANNEL_WRITABILITY_CHANGED; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + executionMask |= MASK_EXCEPTION_CAUGHT; + } + + void assertCalled() { + assertCalled("handlerAdded", MASK_ADDED); + assertCalled("handlerRemoved", MASK_REMOVED); + assertCalled("channelRegistered", MASK_CHANNEL_REGISTER); + assertCalled("channelUnregistered", MASK_CHANNEL_UNREGISTER); + assertCalled("channelActive", MASK_CHANNEL_ACTIVE); + assertCalled("channelInactive", MASK_CHANNEL_INACTIVE); + assertCalled("channelRead", MASK_CHANNEL_READ); + assertCalled("channelReadComplete", MASK_CHANNEL_READ_COMPLETE); + assertCalled("userEventTriggered", MASK_USER_EVENT_TRIGGERED); + assertCalled("channelWritabilityChanged", MASK_CHANNEL_WRITABILITY_CHANGED); + assertCalled("exceptionCaught", MASK_EXCEPTION_CAUGHT); + } + + private void assertCalled(String methodName, int mask) { + assertTrue(methodName + " was not called", (executionMask & mask) != 0); + } + } + + OutboundCalledHandler outboundCalledHandler = new OutboundCalledHandler(); + SkipHandler skipHandler = new SkipHandler(); + InboundCalledHandler inboundCalledHandler = new InboundCalledHandler(); + pipeline.addLast(outboundCalledHandler, skipHandler, inboundCalledHandler); + + pipeline.fireChannelRegistered(); + pipeline.fireChannelUnregistered(); + pipeline.fireChannelActive(); + pipeline.fireChannelInactive(); + pipeline.fireChannelRead(""); + pipeline.fireChannelReadComplete(); + pipeline.fireChannelWritabilityChanged(); + pipeline.fireUserEventTriggered(""); + pipeline.fireExceptionCaught(new Exception()); + + pipeline.deregister().syncUninterruptibly(); + pipeline.bind(new SocketAddress() { + }).syncUninterruptibly(); + pipeline.connect(new SocketAddress() { + }).syncUninterruptibly(); + pipeline.disconnect().syncUninterruptibly(); + pipeline.close().syncUninterruptibly(); + pipeline.write(""); + pipeline.flush(); + pipeline.read(); + + pipeline.remove(outboundCalledHandler); + pipeline.remove(inboundCalledHandler); + pipeline.remove(skipHandler); + + assertFalse(channel.finish()); + + outboundCalledHandler.assertCalled(); + inboundCalledHandler.assertCalled(); + skipHandler.assertSkipped(); + } + @Test public void testWriteThrowsReleaseMessage() { testWriteThrowsReleaseMessage0(false);