Skip execution of Channel*Handler method if annotated with @Skip and … (#8988)

Motivation:

Invoking ChannelHandlers is not free and can result in some overhead when the ChannelPipeline becomes very long. This is especially true if most handlers will just forward the call to the next handler in the pipeline. When the user extends Channel*HandlerAdapter we can easily detect if can just skip the handler and invoke the next handler in the pipeline directly. This reduce the overhead of dispatch but also reduce the call-stack in many cases.

This backports https://github.com/netty/netty/pull/8723 and https://github.com/netty/netty/pull/8987 to 4.1

Modifications:

Detect if we can skip the handler when walking the pipeline.

Result:

Reduce overhead for long pipelines.

Benchmark                                       (extraHandlers)   Mode  Cnt       Score      Error  Units
DefaultChannelPipelineBenchmark.propagateEventOld             4  thrpt   10  267313.031 ± 9131.140  ops/s
DefaultChannelPipelineBenchmark.propagateEvent                4  thrpt   10  824825.673 ± 12727.594  ops/s
This commit is contained in:
Norman Maurer 2019-04-09 09:36:52 +02:00 committed by GitHub
parent c3c05e8570
commit 8f7ef1cabb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 724 additions and 42 deletions

View File

@ -0,0 +1,84 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.microbench.channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.microbench.util.AbstractMicrobenchmark;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
@Warmup(iterations = 5)
@Measurement(iterations = 5)
@State(Scope.Benchmark)
public class DefaultChannelPipelineBenchmark extends AbstractMicrobenchmark {
private static final ChannelHandler NOOP_HANDLER = new ChannelInboundHandlerAdapter() {
@Override
public boolean isSharable() {
return true;
}
};
private static final ChannelHandler CONSUMING_HANDLER = new ChannelInboundHandlerAdapter() {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
// NOOP
}
@Override
public boolean isSharable() {
return true;
}
};
@Param({ "4" })
public int extraHandlers;
private ChannelPipeline pipeline;
@Setup(Level.Iteration)
public void setup() {
pipeline = new EmbeddedChannel().pipeline();
for (int i = 0; i < extraHandlers; i++) {
pipeline.addLast(NOOP_HANDLER);
}
pipeline.addLast(CONSUMING_HANDLER);
}
@TearDown
public void tearDown() {
pipeline.channel().close();
}
@Benchmark
public void propagateEvent(Blackhole hole) {
for (int i = 0; i < 100; i++) {
hole.consume(pipeline.fireChannelReadComplete());
}
}
}

View File

@ -34,6 +34,25 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import static io.netty.channel.ChannelHandlerMask.MASK_BIND;
import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_ACTIVE;
import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_INACTIVE;
import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ;
import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ_COMPLETE;
import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_REGISTERED;
import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_UNREGISTERED;
import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_WRITABILITY_CHANGED;
import static io.netty.channel.ChannelHandlerMask.MASK_CLOSE;
import static io.netty.channel.ChannelHandlerMask.MASK_CONNECT;
import static io.netty.channel.ChannelHandlerMask.MASK_DEREGISTER;
import static io.netty.channel.ChannelHandlerMask.MASK_DISCONNECT;
import static io.netty.channel.ChannelHandlerMask.MASK_EXCEPTION_CAUGHT;
import static io.netty.channel.ChannelHandlerMask.MASK_FLUSH;
import static io.netty.channel.ChannelHandlerMask.MASK_READ;
import static io.netty.channel.ChannelHandlerMask.MASK_USER_EVENT_TRIGGERED;
import static io.netty.channel.ChannelHandlerMask.MASK_WRITE;
import static io.netty.channel.ChannelHandlerMask.mask;
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
@ -61,11 +80,10 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
*/
private static final int INIT = 0;
private final boolean inbound;
private final boolean outbound;
private final DefaultChannelPipeline pipeline;
private final String name;
private final boolean ordered;
private final int executionMask;
// Will be set to null if no child executor should be used, otherwise it will be set to the
// child executor.
@ -78,13 +96,12 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
private volatile int handlerState = INIT;
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
String name, Class<? extends ChannelHandler> handlerClass) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
this.executionMask = mask(handlerClass);
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
@ -120,7 +137,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
@Override
public ChannelHandlerContext fireChannelRegistered() {
invokeChannelRegistered(findContextInbound());
invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
return this;
}
@ -152,7 +169,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
@Override
public ChannelHandlerContext fireChannelUnregistered() {
invokeChannelUnregistered(findContextInbound());
invokeChannelUnregistered(findContextInbound(MASK_CHANNEL_UNREGISTERED));
return this;
}
@ -184,7 +201,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
@Override
public ChannelHandlerContext fireChannelActive() {
invokeChannelActive(findContextInbound());
invokeChannelActive(findContextInbound(MASK_CHANNEL_ACTIVE));
return this;
}
@ -216,7 +233,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
@Override
public ChannelHandlerContext fireChannelInactive() {
invokeChannelInactive(findContextInbound());
invokeChannelInactive(findContextInbound(MASK_CHANNEL_INACTIVE));
return this;
}
@ -248,7 +265,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
@Override
public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
invokeExceptionCaught(next, cause);
invokeExceptionCaught(findContextInbound(MASK_EXCEPTION_CAUGHT), cause);
return this;
}
@ -299,7 +316,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
@Override
public ChannelHandlerContext fireUserEventTriggered(final Object event) {
invokeUserEventTriggered(findContextInbound(), event);
invokeUserEventTriggered(findContextInbound(MASK_USER_EVENT_TRIGGERED), event);
return this;
}
@ -332,7 +349,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
@ -365,7 +382,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
@Override
public ChannelHandlerContext fireChannelReadComplete() {
invokeChannelReadComplete(findContextInbound());
invokeChannelReadComplete(findContextInbound(MASK_CHANNEL_READ_COMPLETE));
return this;
}
@ -396,7 +413,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
@Override
public ChannelHandlerContext fireChannelWritabilityChanged() {
invokeChannelWritabilityChanged(findContextInbound());
invokeChannelWritabilityChanged(findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED));
return this;
}
@ -465,7 +482,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
@ -509,7 +526,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
@ -543,7 +560,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
final AbstractChannelHandlerContext next = findContextOutbound(MASK_DISCONNECT);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// Translate disconnect to close if the channel has no notion of disconnect-reconnect.
@ -587,7 +604,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
final AbstractChannelHandlerContext next = findContextOutbound(MASK_CLOSE);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeClose(promise);
@ -622,7 +639,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
final AbstractChannelHandlerContext next = findContextOutbound(MASK_DEREGISTER);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeDeregister(promise);
@ -652,7 +669,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
@Override
public ChannelHandlerContext read() {
final AbstractChannelHandlerContext next = findContextOutbound();
final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeRead();
@ -709,7 +726,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
@Override
public ChannelHandlerContext flush() {
final AbstractChannelHandlerContext next = findContextOutbound();
final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeFlush();
@ -768,7 +785,8 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
throw e;
}
AbstractChannelHandlerContext next = findContextOutbound();
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
@ -899,19 +917,19 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
return false;
}
private AbstractChannelHandlerContext findContextInbound() {
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
} while ((ctx.executionMask & mask) == 0);
return ctx;
}
private AbstractChannelHandlerContext findContextOutbound() {
private AbstractChannelHandlerContext findContextOutbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
} while ((ctx.executionMask & mask) == 0);
return ctx;
}

View File

@ -15,6 +15,8 @@
*/
package io.netty.channel;
import io.netty.channel.ChannelHandlerMask.Skip;
import java.net.SocketAddress;
/**
@ -32,6 +34,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
@ -44,6 +47,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
@ -56,6 +60,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
@ -68,6 +73,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.close(promise);
@ -79,6 +85,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
@ -90,6 +97,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.read();
@ -101,6 +109,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
@ -112,6 +121,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();

View File

@ -16,6 +16,7 @@
package io.netty.channel;
import io.netty.channel.ChannelHandlerMask.Skip;
import io.netty.util.internal.InternalThreadLocalMap;
import java.util.Map;
@ -84,6 +85,7 @@ public abstract class ChannelHandlerAdapter implements ChannelHandler {
*
* @deprecated is part of {@link ChannelInboundHandler}
*/
@Skip
@Override
@Deprecated
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

View File

@ -0,0 +1,189 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.PlatformDependent;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.net.SocketAddress;
import java.security.AccessController;
import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.WeakHashMap;
final class ChannelHandlerMask {
// Using to mask which methods must be called for a ChannelHandler.
static final int MASK_EXCEPTION_CAUGHT = 1;
static final int MASK_CHANNEL_REGISTERED = 1 << 1;
static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
static final int MASK_CHANNEL_ACTIVE = 1 << 3;
static final int MASK_CHANNEL_INACTIVE = 1 << 4;
static final int MASK_CHANNEL_READ = 1 << 5;
static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
static final int MASK_BIND = 1 << 9;
static final int MASK_CONNECT = 1 << 10;
static final int MASK_DISCONNECT = 1 << 11;
static final int MASK_CLOSE = 1 << 12;
static final int MASK_DEREGISTER = 1 << 13;
static final int MASK_READ = 1 << 14;
static final int MASK_WRITE = 1 << 15;
static final int MASK_FLUSH = 1 << 16;
private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_CHANNEL_REGISTERED |
MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
private static final FastThreadLocal<Map<Class<? extends ChannelHandler>, Integer>> MASKS =
new FastThreadLocal<Map<Class<? extends ChannelHandler>, Integer>>() {
@Override
protected Map<Class<? extends ChannelHandler>, Integer> initialValue() {
return new WeakHashMap<Class<? extends ChannelHandler>, Integer>(32);
}
};
/**
* Return the {@code executionMask}.
*/
static int mask(Class<? extends ChannelHandler> clazz) {
// Try to obtain the mask from the cache first. If this fails calculate it and put it in the cache for fast
// lookup in the future.
Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
Integer mask = cache.get(clazz);
if (mask == null) {
mask = mask0(clazz);
cache.put(clazz, mask);
}
return mask;
}
/**
* Calculate the {@code executionMask}.
*/
private static int mask0(Class<? extends ChannelHandler> handlerType) {
int mask = MASK_EXCEPTION_CAUGHT;
try {
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_INBOUND;
if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_REGISTERED;
}
if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_UNREGISTERED;
}
if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_ACTIVE;
}
if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_INACTIVE;
}
if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_CHANNEL_READ;
}
if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_READ_COMPLETE;
}
if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
}
if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_USER_EVENT_TRIGGERED;
}
}
if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_OUTBOUND;
if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_BIND;
}
if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_CONNECT;
}
if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DISCONNECT;
}
if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_CLOSE;
}
if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DEREGISTER;
}
if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
mask &= ~MASK_READ;
}
if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
Object.class, ChannelPromise.class)) {
mask &= ~MASK_WRITE;
}
if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {
mask &= ~MASK_FLUSH;
}
}
if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
mask &= ~MASK_EXCEPTION_CAUGHT;
}
} catch (Exception e) {
// Should never reach here.
PlatformDependent.throwException(e);
}
return mask;
}
@SuppressWarnings("rawtypes")
private static boolean isSkippable(
final Class<?> handlerType, final String methodName, final Class<?>... paramTypes) throws Exception {
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws Exception {
return handlerType.getMethod(methodName, paramTypes).isAnnotationPresent(Skip.class);
}
});
}
private ChannelHandlerMask() { }
/**
* Indicates that the annotated event handler method in {@link ChannelHandler} will not be invoked by
* {@link ChannelPipeline} and so <strong>MUST</strong> only be used when the {@link ChannelHandler}
* method does nothing except forward to the next {@link ChannelHandler} in the pipeline.
* <p>
* Note that this annotation is not {@linkplain Inherited inherited}. If a user overrides a method annotated with
* {@link Skip}, it will not be skipped anymore. Similarly, the user can override a method not annotated with
* {@link Skip} and simply pass the event through to the next handler, which reverses the behavior of the
* supertype.
* </p>
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@interface Skip {
// no value
}
}

View File

@ -15,6 +15,8 @@
*/
package io.netty.channel;
import io.netty.channel.ChannelHandlerMask.Skip;
/**
* Abstract base class for {@link ChannelInboundHandler} implementations which provide
* implementations of all of their methods.
@ -37,6 +39,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
@ -48,6 +51,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
@ -59,6 +63,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
@ -70,6 +75,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
@ -81,6 +87,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
@ -92,6 +99,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
@ -103,6 +111,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
@ -114,6 +123,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
@ -125,6 +135,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
@SuppressWarnings("deprecation")
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

View File

@ -15,6 +15,8 @@
*/
package io.netty.channel;
import io.netty.channel.ChannelHandlerMask.Skip;
import java.net.SocketAddress;
/**
@ -29,6 +31,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
@ -41,6 +44,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
@ -53,6 +57,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
@ -65,6 +70,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
@ -77,6 +83,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
@ -88,6 +95,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.read();
@ -99,6 +107,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
@ -110,6 +119,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();

View File

@ -23,10 +23,7 @@ final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
super(pipeline, executor, name, handler.getClass());
this.handler = handler;
}
@ -34,12 +31,4 @@ final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
public ChannelHandler handler() {
return handler;
}
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
}

View File

@ -1243,7 +1243,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false);
super(pipeline, null, TAIL_NAME, TailContext.class);
setAddComplete();
}
@ -1306,7 +1306,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, true, true);
super(pipeline, null, HEAD_NAME, HeadContext.class);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}

View File

@ -21,10 +21,10 @@ import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerMask.Skip;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.local.LocalServerChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.oio.OioEventLoopGroup;
@ -45,6 +45,7 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Test;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
@ -1223,6 +1224,374 @@ public class DefaultChannelPipelineTest {
}
}
@Test
public void testSkipHandlerMethodsIfAnnotated() {
EmbeddedChannel channel = new EmbeddedChannel(true);
ChannelPipeline pipeline = channel.pipeline();
final class SkipHandler implements ChannelInboundHandler, ChannelOutboundHandler {
private int state = 2;
private Error errorRef;
private void fail() {
errorRef = new AssertionError("Method should never been called");
}
@Skip
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
fail();
ctx.bind(localAddress, promise);
}
@Skip
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) {
fail();
ctx.connect(remoteAddress, localAddress, promise);
}
@Skip
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
fail();
ctx.disconnect(promise);
}
@Skip
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
fail();
ctx.close(promise);
}
@Skip
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
fail();
ctx.deregister(promise);
}
@Skip
@Override
public void read(ChannelHandlerContext ctx) {
fail();
ctx.read();
}
@Skip
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
fail();
ctx.write(msg, promise);
}
@Skip
@Override
public void flush(ChannelHandlerContext ctx) {
fail();
ctx.flush();
}
@Skip
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
fail();
ctx.fireChannelRegistered();
}
@Skip
@Override
public void channelUnregistered(ChannelHandlerContext ctx) {
fail();
ctx.fireChannelUnregistered();
}
@Skip
@Override
public void channelActive(ChannelHandlerContext ctx) {
fail();
ctx.fireChannelActive();
}
@Skip
@Override
public void channelInactive(ChannelHandlerContext ctx) {
fail();
ctx.fireChannelInactive();
}
@Skip
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
fail();
ctx.fireChannelRead(msg);
}
@Skip
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
fail();
ctx.fireChannelReadComplete();
}
@Skip
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
fail();
ctx.fireUserEventTriggered(evt);
}
@Skip
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) {
fail();
ctx.fireChannelWritabilityChanged();
}
@Skip
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
fail();
ctx.fireExceptionCaught(cause);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
state--;
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
state--;
}
void assertSkipped() {
assertEquals(0, state);
Error error = errorRef;
if (error != null) {
throw error;
}
}
}
final class OutboundCalledHandler extends ChannelOutboundHandlerAdapter {
private static final int MASK_BIND = 1;
private static final int MASK_CONNECT = 1 << 1;
private static final int MASK_DISCONNECT = 1 << 2;
private static final int MASK_CLOSE = 1 << 3;
private static final int MASK_DEREGISTER = 1 << 4;
private static final int MASK_READ = 1 << 5;
private static final int MASK_WRITE = 1 << 6;
private static final int MASK_FLUSH = 1 << 7;
private static final int MASK_ADDED = 1 << 8;
private static final int MASK_REMOVED = 1 << 9;
private int executionMask;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
executionMask |= MASK_ADDED;
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
executionMask |= MASK_REMOVED;
}
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
executionMask |= MASK_BIND;
promise.setSuccess();
}
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) {
executionMask |= MASK_CONNECT;
promise.setSuccess();
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
executionMask |= MASK_DISCONNECT;
promise.setSuccess();
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
executionMask |= MASK_CLOSE;
promise.setSuccess();
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
executionMask |= MASK_DEREGISTER;
promise.setSuccess();
}
@Override
public void read(ChannelHandlerContext ctx) {
executionMask |= MASK_READ;
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
executionMask |= MASK_WRITE;
promise.setSuccess();
}
@Override
public void flush(ChannelHandlerContext ctx) {
executionMask |= MASK_FLUSH;
}
void assertCalled() {
assertCalled("handlerAdded", MASK_ADDED);
assertCalled("handlerRemoved", MASK_REMOVED);
assertCalled("bind", MASK_BIND);
assertCalled("connect", MASK_CONNECT);
assertCalled("disconnect", MASK_DISCONNECT);
assertCalled("close", MASK_CLOSE);
assertCalled("deregister", MASK_DEREGISTER);
assertCalled("read", MASK_READ);
assertCalled("write", MASK_WRITE);
assertCalled("flush", MASK_FLUSH);
}
private void assertCalled(String methodName, int mask) {
assertTrue(methodName + " was not called", (executionMask & mask) != 0);
}
}
final class InboundCalledHandler extends ChannelInboundHandlerAdapter {
private static final int MASK_CHANNEL_REGISTER = 1;
private static final int MASK_CHANNEL_UNREGISTER = 1 << 1;
private static final int MASK_CHANNEL_ACTIVE = 1 << 2;
private static final int MASK_CHANNEL_INACTIVE = 1 << 3;
private static final int MASK_CHANNEL_READ = 1 << 4;
private static final int MASK_CHANNEL_READ_COMPLETE = 1 << 5;
private static final int MASK_USER_EVENT_TRIGGERED = 1 << 6;
private static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 7;
private static final int MASK_EXCEPTION_CAUGHT = 1 << 8;
private static final int MASK_ADDED = 1 << 9;
private static final int MASK_REMOVED = 1 << 10;
private int executionMask;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
executionMask |= MASK_ADDED;
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
executionMask |= MASK_REMOVED;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
executionMask |= MASK_CHANNEL_REGISTER;
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) {
executionMask |= MASK_CHANNEL_UNREGISTER;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
executionMask |= MASK_CHANNEL_ACTIVE;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
executionMask |= MASK_CHANNEL_INACTIVE;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
executionMask |= MASK_CHANNEL_READ;
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
executionMask |= MASK_CHANNEL_READ_COMPLETE;
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
executionMask |= MASK_USER_EVENT_TRIGGERED;
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) {
executionMask |= MASK_CHANNEL_WRITABILITY_CHANGED;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
executionMask |= MASK_EXCEPTION_CAUGHT;
}
void assertCalled() {
assertCalled("handlerAdded", MASK_ADDED);
assertCalled("handlerRemoved", MASK_REMOVED);
assertCalled("channelRegistered", MASK_CHANNEL_REGISTER);
assertCalled("channelUnregistered", MASK_CHANNEL_UNREGISTER);
assertCalled("channelActive", MASK_CHANNEL_ACTIVE);
assertCalled("channelInactive", MASK_CHANNEL_INACTIVE);
assertCalled("channelRead", MASK_CHANNEL_READ);
assertCalled("channelReadComplete", MASK_CHANNEL_READ_COMPLETE);
assertCalled("userEventTriggered", MASK_USER_EVENT_TRIGGERED);
assertCalled("channelWritabilityChanged", MASK_CHANNEL_WRITABILITY_CHANGED);
assertCalled("exceptionCaught", MASK_EXCEPTION_CAUGHT);
}
private void assertCalled(String methodName, int mask) {
assertTrue(methodName + " was not called", (executionMask & mask) != 0);
}
}
OutboundCalledHandler outboundCalledHandler = new OutboundCalledHandler();
SkipHandler skipHandler = new SkipHandler();
InboundCalledHandler inboundCalledHandler = new InboundCalledHandler();
pipeline.addLast(outboundCalledHandler, skipHandler, inboundCalledHandler);
pipeline.fireChannelRegistered();
pipeline.fireChannelUnregistered();
pipeline.fireChannelActive();
pipeline.fireChannelInactive();
pipeline.fireChannelRead("");
pipeline.fireChannelReadComplete();
pipeline.fireChannelWritabilityChanged();
pipeline.fireUserEventTriggered("");
pipeline.fireExceptionCaught(new Exception());
pipeline.deregister().syncUninterruptibly();
pipeline.bind(new SocketAddress() {
}).syncUninterruptibly();
pipeline.connect(new SocketAddress() {
}).syncUninterruptibly();
pipeline.disconnect().syncUninterruptibly();
pipeline.close().syncUninterruptibly();
pipeline.write("");
pipeline.flush();
pipeline.read();
pipeline.remove(outboundCalledHandler);
pipeline.remove(inboundCalledHandler);
pipeline.remove(skipHandler);
assertFalse(channel.finish());
outboundCalledHandler.assertCalled();
inboundCalledHandler.assertCalled();
skipHandler.assertSkipped();
}
@Test
public void testWriteThrowsReleaseMessage() {
testWriteThrowsReleaseMessage0(false);