Skip execution of Channel*Handler method if annotated with @Skip and just use the next handler in the pipeline. (#8723)

Motivation:

Invoking ChannelHandlers is not free and can result in some overhead when the ChannelPipeline becomes very long. This is especially true if most handlers will just forward the call to the next handler in the pipeline. When the user extends Channel*HandlerAdapter we can easily detect if can just skip the handler and invoke the next handler in the pipeline directly. This reduce the overhead of dispatch but also reduce the call-stack in many cases.

Modifications:

Detect if we can skip the handler when walking the pipeline.

Result:

Reduce overhead for long pipelines.

Benchmark                                       (extraHandlers)   Mode  Cnt       Score      Error  Units
DefaultChannelPipelineBenchmark.propagateEventOld             4  thrpt   10  267313.031 ± 9131.140  ops/s
DefaultChannelPipelineBenchmark.propagateEvent                4  thrpt   10  824825.673 ± 12727.594  ops/s
This commit is contained in:
Norman Maurer 2019-01-22 08:58:58 +01:00 committed by GitHub
parent 2df8bca8da
commit 8fdf373557
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 706 additions and 42 deletions

View File

@ -0,0 +1,84 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.microbench.channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.microbench.util.AbstractMicrobenchmark;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
@Warmup(iterations = 5)
@Measurement(iterations = 5)
@State(Scope.Benchmark)
public class DefaultChannelPipelineBenchmark extends AbstractMicrobenchmark {
private static final ChannelInboundHandler NOOP_HANDLER = new ChannelInboundHandlerAdapter() {
@Override
public boolean isSharable() {
return true;
}
};
private static final ChannelInboundHandler CONSUMING_HANDLER = new ChannelInboundHandlerAdapter() {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
// NOOP
}
@Override
public boolean isSharable() {
return true;
}
};
@Param({ "4" })
public int extraHandlers;
private ChannelPipeline pipeline;
@Setup(Level.Iteration)
public void setup() {
pipeline = new EmbeddedChannel().pipeline();
for (int i = 0; i < extraHandlers; i++) {
pipeline.addLast(NOOP_HANDLER);
}
pipeline.addLast(CONSUMING_HANDLER);
}
@TearDown
public void tearDown() {
pipeline.channel().close();
}
@Benchmark
public void propagateEvent(Blackhole hole) {
for (int i = 0; i < 100; i++) {
hole.consume(pipeline.fireChannelReadComplete());
}
}
}

View File

@ -23,7 +23,9 @@ import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ResourceLeakHint;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.OrderedEventExecutor;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.PromiseNotificationUtil;
import io.netty.util.internal.ThrowableUtil;
import io.netty.util.internal.ObjectUtil;
@ -33,6 +35,10 @@ import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.net.SocketAddress;
import java.security.AccessController;
import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
@ -63,8 +69,42 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
*/
private static final int INIT = 0;
private final boolean inbound;
private final boolean outbound;
// Using to mask which methods must be called for a ChannelHandler.
private static final int MASK_EXCEPTION_CAUGHT = 1;
private static final int MASK_CHANNEL_REGISTERED = 1 << 1;
private static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
private static final int MASK_CHANNEL_ACTIVE = 1 << 3;
private static final int MASK_CHANNEL_INACTIVE = 1 << 4;
private static final int MASK_CHANNEL_READ = 1 << 5;
private static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
private static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
private static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
private static final int MASK_BIND = 1 << 9;
private static final int MASK_CONNECT = 1 << 10;
private static final int MASK_DISCONNECT = 1 << 11;
private static final int MASK_CLOSE = 1 << 12;
private static final int MASK_REGISTER = 1 << 13;
private static final int MASK_DEREGISTER = 1 << 14;
private static final int MASK_READ = 1 << 15;
private static final int MASK_WRITE = 1 << 16;
private static final int MASK_FLUSH = 1 << 17;
private static final int MASK_ALL_INBOUND = MASK_CHANNEL_REGISTERED |
MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
private static final int MASK_ALL_OUTBOUND = MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
MASK_CLOSE | MASK_REGISTER | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
private static final FastThreadLocal<Map<Class<? extends ChannelHandler>, Integer>> MASKS =
new FastThreadLocal<Map<Class<? extends ChannelHandler>, Integer>>() {
@Override
protected Map<Class<? extends ChannelHandler>, Integer> initialValue() {
return new WeakHashMap<Class<? extends ChannelHandler>, Integer>(32);
}
};
private final int executionMask;
private final DefaultChannelPipeline pipeline;
private final String name;
private final boolean ordered;
@ -84,16 +124,122 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
private volatile int handlerState = INIT;
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
Class<? extends ChannelHandler> handlerClass) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
this.executionMask = mask(handlerClass);
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
/**
* Return the {@code executionMask}.
*/
private static int mask(Class<? extends ChannelHandler> clazz) {
// Try to obtain the mask from the cache first. If this fails calculate it and put it in the cache for fast
// lookup in the future.
Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
Integer mask = cache.get(clazz);
if (mask == null) {
mask = mask0(clazz);
cache.put(clazz, mask);
}
return mask;
}
/**
* Calculate the {@code executionMask}.
*/
private static int mask0(Class<? extends ChannelHandler> handlerType) {
int mask = MASK_EXCEPTION_CAUGHT;
try {
if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
mask &= ~MASK_EXCEPTION_CAUGHT;
}
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_INBOUND;
if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_REGISTERED;
}
if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_UNREGISTERED;
}
if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_ACTIVE;
}
if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_INACTIVE;
}
if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_CHANNEL_READ;
}
if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_READ_COMPLETE;
}
if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
}
if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_USER_EVENT_TRIGGERED;
}
}
if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_OUTBOUND;
if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_BIND;
}
if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_CONNECT;
}
if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DISCONNECT;
}
if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_CLOSE;
}
if (isSkippable(handlerType, "register", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_REGISTER;
}
if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DEREGISTER;
}
if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
mask &= ~MASK_READ;
}
if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
Object.class, ChannelPromise.class)) {
mask &= ~MASK_WRITE;
}
if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {
mask &= ~MASK_FLUSH;
}
}
} catch (Exception e) {
// Should never reach here.
PlatformDependent.throwException(e);
}
return mask;
}
@SuppressWarnings("rawtypes")
private static boolean isSkippable(
final Class<?> handlerType, final String methodName, final Class<?>... paramTypes) throws Exception {
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws Exception {
return handlerType.getMethod(methodName, paramTypes).isAnnotationPresent(ChannelHandler.Skip.class);
}
});
}
@Override
public Channel channel() {
return pipeline.channel();
@ -125,7 +271,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
@Override
public ChannelHandlerContext fireChannelRegistered() {
invokeChannelRegistered(findContextInbound());
invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
return this;
}
@ -157,7 +303,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
@Override
public ChannelHandlerContext fireChannelUnregistered() {
invokeChannelUnregistered(findContextInbound());
invokeChannelUnregistered(findContextInbound(MASK_CHANNEL_UNREGISTERED));
return this;
}
@ -189,7 +335,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
@Override
public ChannelHandlerContext fireChannelActive() {
invokeChannelActive(findContextInbound());
invokeChannelActive(findContextInbound(MASK_CHANNEL_ACTIVE));
return this;
}
@ -221,7 +367,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
@Override
public ChannelHandlerContext fireChannelInactive() {
invokeChannelInactive(findContextInbound());
invokeChannelInactive(findContextInbound(MASK_CHANNEL_INACTIVE));
return this;
}
@ -253,7 +399,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
@Override
public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
invokeExceptionCaught(next, cause);
invokeExceptionCaught(findContextInbound(MASK_EXCEPTION_CAUGHT), cause);
return this;
}
@ -304,7 +450,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
@Override
public ChannelHandlerContext fireUserEventTriggered(final Object event) {
invokeUserEventTriggered(findContextInbound(), event);
invokeUserEventTriggered(findContextInbound(MASK_USER_EVENT_TRIGGERED), event);
return this;
}
@ -337,7 +483,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
@ -370,7 +516,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
@Override
public ChannelHandlerContext fireChannelReadComplete() {
invokeChannelReadComplete(findContextInbound());
invokeChannelReadComplete(findContextInbound(MASK_CHANNEL_READ_COMPLETE));
return this;
}
@ -406,7 +552,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
@Override
public ChannelHandlerContext fireChannelWritabilityChanged() {
invokeChannelWritabilityChanged(findContextInbound());
invokeChannelWritabilityChanged(findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED));
return this;
}
@ -485,7 +631,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
@ -529,7 +675,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
@ -563,7 +709,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
final AbstractChannelHandlerContext next = findContextOutbound(MASK_DISCONNECT);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// Translate disconnect to close if the channel has no notion of disconnect-reconnect.
@ -607,7 +753,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
final AbstractChannelHandlerContext next = findContextOutbound(MASK_CLOSE);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeClose(promise);
@ -642,7 +788,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
final AbstractChannelHandlerContext next = findContextOutbound(MASK_REGISTER);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeRegister(promise);
@ -677,7 +823,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
final AbstractChannelHandlerContext next = findContextOutbound(MASK_DEREGISTER);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeDeregister(promise);
@ -707,7 +853,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
@Override
public ChannelHandlerContext read() {
final AbstractChannelHandlerContext next = findContextOutbound();
final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeRead();
@ -783,7 +929,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
@Override
public ChannelHandlerContext flush() {
final AbstractChannelHandlerContext next = findContextOutbound();
final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeFlush();
@ -846,7 +992,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final AbstractChannelHandlerContext next = findContextOutbound(flush ? (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
@ -977,19 +1123,19 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
return false;
}
private AbstractChannelHandlerContext findContextInbound() {
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
} while ((ctx.executionMask & mask) == 0);
return ctx;
}
private AbstractChannelHandlerContext findContextOutbound() {
private AbstractChannelHandlerContext findContextOutbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
} while ((ctx.executionMask & mask) == 0);
return ctx;
}

View File

@ -32,6 +32,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
@ -44,6 +45,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
@ -56,6 +58,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
@ -68,6 +71,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.close(promise);
@ -79,6 +83,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void register(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.register(promise);
@ -90,6 +95,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
@ -101,6 +107,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.read();
@ -112,6 +119,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
@ -123,6 +131,7 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();

View File

@ -215,4 +215,30 @@ public interface ChannelHandler {
@interface Sharable {
// no value
}
/**
* Indicates that the annotated event handler method in {@link ChannelHandler} will not be invoked by
* {@link ChannelPipeline}. This annotation is only useful when your handler method implementation
* only passes the event through to the next handler, like the following:
*
* <pre>
* {@code @Skip}
* {@code @Override}
* public void channelActive({@link ChannelHandlerContext} ctx) {
* ctx.fireChannelActive(); // do nothing but passing through to the next handler
* }
* </pre>
*
* <p>
* Note that this annotation is not {@linkplain Inherited inherited}. If you override a method annotated with
* {@link Skip}, it will not be skipped anymore. Similarly, you can override a method not annotated with
* {@link Skip} and simply pass the event through to the next handler, which reverses the behavior of the
* supertype.
* </p>
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@interface Skip {
// no value
}
}

View File

@ -82,6 +82,7 @@ public abstract class ChannelHandlerAdapter implements ChannelHandler {
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);

View File

@ -37,6 +37,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
@ -48,6 +49,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
@ -59,6 +61,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
@ -70,6 +73,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
@ -81,6 +85,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
@ -92,6 +97,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
@ -103,6 +109,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
@ -114,6 +121,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
@ -125,6 +133,7 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {

View File

@ -29,6 +29,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
@ -41,6 +42,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
@ -53,6 +55,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
@ -65,6 +68,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
@ -77,6 +81,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void register(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.register(promise);
@ -88,6 +93,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
@ -99,6 +105,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.read();
@ -110,6 +117,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
@ -121,6 +129,7 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();

View File

@ -16,6 +16,7 @@
package io.netty.channel;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.ObjectUtil;
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
@ -23,10 +24,7 @@ final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
super(pipeline, executor, name, ObjectUtil.checkNotNull(handler, "handler").getClass());
this.handler = handler;
}
@ -34,12 +32,4 @@ final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
public ChannelHandler handler() {
return handler;
}
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
}

View File

@ -36,7 +36,6 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.WeakHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/**
@ -1149,7 +1148,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false);
super(pipeline, null, TAIL_NAME, TailContext.class);
setAddComplete();
}
@ -1212,7 +1211,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, true, true);
super(pipeline, null, HEAD_NAME, HeadContext.class);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
@ -1282,11 +1281,13 @@ public class DefaultChannelPipeline implements ChannelPipeline {
unsafe.flush();
}
@Skip
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
@Skip
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
@ -1302,31 +1303,37 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
}
@Skip
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
@Skip
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
@Skip
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
@Skip
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
@Skip
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
@Skip
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();

View File

@ -43,6 +43,7 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Test;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
@ -1197,6 +1198,388 @@ public class DefaultChannelPipelineTest {
}
}
@Test
public void testSkipHandlerMethodsIfAnnotated() {
EmbeddedChannel channel = new EmbeddedChannel(true);
ChannelPipeline pipeline = channel.pipeline();
final class SkipHandler implements ChannelInboundHandler, ChannelOutboundHandler {
private int state = 2;
private Error errorRef;
private void fail() {
errorRef = new AssertionError("Method should never been called");
}
@Skip
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
fail();
ctx.bind(localAddress, promise);
}
@Skip
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) {
fail();
ctx.connect(remoteAddress, localAddress, promise);
}
@Skip
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
fail();
ctx.disconnect(promise);
}
@Skip
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
fail();
ctx.close(promise);
}
@Skip
@Override
public void register(ChannelHandlerContext ctx, ChannelPromise promise) {
fail();
ctx.register(promise);
}
@Skip
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
fail();
ctx.deregister(promise);
}
@Skip
@Override
public void read(ChannelHandlerContext ctx) {
fail();
ctx.read();
}
@Skip
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
fail();
ctx.write(msg, promise);
}
@Skip
@Override
public void flush(ChannelHandlerContext ctx) {
fail();
ctx.flush();
}
@Skip
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
fail();
ctx.fireChannelRegistered();
}
@Skip
@Override
public void channelUnregistered(ChannelHandlerContext ctx) {
fail();
ctx.fireChannelUnregistered();
}
@Skip
@Override
public void channelActive(ChannelHandlerContext ctx) {
fail();
ctx.fireChannelActive();
}
@Skip
@Override
public void channelInactive(ChannelHandlerContext ctx) {
fail();
ctx.fireChannelInactive();
}
@Skip
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
fail();
ctx.fireChannelRead(msg);
}
@Skip
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
fail();
ctx.fireChannelReadComplete();
}
@Skip
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
fail();
ctx.fireUserEventTriggered(evt);
}
@Skip
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) {
fail();
ctx.fireChannelWritabilityChanged();
}
@Skip
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
fail();
ctx.fireExceptionCaught(cause);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
state--;
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
state--;
}
void assertSkipped() {
assertEquals(0, state);
Error error = errorRef;
if (error != null) {
throw error;
}
}
}
final class OutboundCalledHandler extends ChannelOutboundHandlerAdapter {
private static final int MASK_BIND = 1;
private static final int MASK_CONNECT = 1 << 1;
private static final int MASK_DISCONNECT = 1 << 2;
private static final int MASK_CLOSE = 1 << 3;
private static final int MASK_REGISTER = 1 << 4;
private static final int MASK_DEREGISTER = 1 << 5;
private static final int MASK_READ = 1 << 6;
private static final int MASK_WRITE = 1 << 7;
private static final int MASK_FLUSH = 1 << 8;
private static final int MASK_ADDED = 1 << 9;
private static final int MASK_REMOVED = 1 << 10;
private int executionMask;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
executionMask |= MASK_ADDED;
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
executionMask |= MASK_REMOVED;
}
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
executionMask |= MASK_BIND;
promise.setSuccess();
}
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) {
executionMask |= MASK_CONNECT;
promise.setSuccess();
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
executionMask |= MASK_DISCONNECT;
promise.setSuccess();
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
executionMask |= MASK_CLOSE;
promise.setSuccess();
}
@Override
public void register(ChannelHandlerContext ctx, ChannelPromise promise) {
executionMask |= MASK_REGISTER;
promise.setSuccess();
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
executionMask |= MASK_DEREGISTER;
promise.setSuccess();
}
@Override
public void read(ChannelHandlerContext ctx) {
executionMask |= MASK_READ;
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
executionMask |= MASK_WRITE;
promise.setSuccess();
}
@Override
public void flush(ChannelHandlerContext ctx) {
executionMask |= MASK_FLUSH;
}
void assertCalled() {
assertCalled("handlerAdded", MASK_ADDED);
assertCalled("handlerRemoved", MASK_REMOVED);
assertCalled("bind", MASK_BIND);
assertCalled("connect", MASK_CONNECT);
assertCalled("disconnect", MASK_DISCONNECT);
assertCalled("close", MASK_CLOSE);
assertCalled("register", MASK_REGISTER);
assertCalled("deregister", MASK_DEREGISTER);
assertCalled("read", MASK_READ);
assertCalled("write", MASK_WRITE);
assertCalled("flush", MASK_FLUSH);
}
private void assertCalled(String methodName, int mask) {
assertTrue(methodName + " was not called", (executionMask & mask) != 0);
}
}
final class InboundCalledHandler extends ChannelInboundHandlerAdapter {
private static final int MASK_CHANNEL_REGISTER = 1;
private static final int MASK_CHANNEL_UNREGISTER = 1 << 1;
private static final int MASK_CHANNEL_ACTIVE = 1 << 2;
private static final int MASK_CHANNEL_INACTIVE = 1 << 3;
private static final int MASK_CHANNEL_READ = 1 << 4;
private static final int MASK_CHANNEL_READ_COMPLETE = 1 << 5;
private static final int MASK_USER_EVENT_TRIGGERED = 1 << 6;
private static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 7;
private static final int MASK_EXCEPTION_CAUGHT = 1 << 8;
private static final int MASK_ADDED = 1 << 9;
private static final int MASK_REMOVED = 1 << 10;
private int executionMask;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
executionMask |= MASK_ADDED;
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
executionMask |= MASK_REMOVED;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
executionMask |= MASK_CHANNEL_REGISTER;
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) {
executionMask |= MASK_CHANNEL_UNREGISTER;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
executionMask |= MASK_CHANNEL_ACTIVE;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
executionMask |= MASK_CHANNEL_INACTIVE;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
executionMask |= MASK_CHANNEL_READ;
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
executionMask |= MASK_CHANNEL_READ_COMPLETE;
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
executionMask |= MASK_USER_EVENT_TRIGGERED;
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) {
executionMask |= MASK_CHANNEL_WRITABILITY_CHANGED;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
executionMask |= MASK_EXCEPTION_CAUGHT;
}
void assertCalled() {
assertCalled("handlerAdded", MASK_ADDED);
assertCalled("handlerRemoved", MASK_REMOVED);
assertCalled("channelRegistered", MASK_CHANNEL_REGISTER);
assertCalled("channelUnregistered", MASK_CHANNEL_UNREGISTER);
assertCalled("channelActive", MASK_CHANNEL_ACTIVE);
assertCalled("channelInactive", MASK_CHANNEL_INACTIVE);
assertCalled("channelRead", MASK_CHANNEL_READ);
assertCalled("channelReadComplete", MASK_CHANNEL_READ_COMPLETE);
assertCalled("userEventTriggered", MASK_USER_EVENT_TRIGGERED);
assertCalled("channelWritabilityChanged", MASK_CHANNEL_WRITABILITY_CHANGED);
assertCalled("exceptionCaught", MASK_EXCEPTION_CAUGHT);
}
private void assertCalled(String methodName, int mask) {
assertTrue(methodName + " was not called", (executionMask & mask) != 0);
}
}
OutboundCalledHandler outboundCalledHandler = new OutboundCalledHandler();
SkipHandler skipHandler = new SkipHandler();
InboundCalledHandler inboundCalledHandler = new InboundCalledHandler();
pipeline.addLast(outboundCalledHandler, skipHandler, inboundCalledHandler);
pipeline.fireChannelRegistered();
pipeline.fireChannelUnregistered();
pipeline.fireChannelActive();
pipeline.fireChannelInactive();
pipeline.fireChannelRead("");
pipeline.fireChannelReadComplete();
pipeline.fireChannelWritabilityChanged();
pipeline.fireUserEventTriggered("");
pipeline.fireExceptionCaught(new Exception());
pipeline.register().syncUninterruptibly();
pipeline.deregister().syncUninterruptibly();
pipeline.bind(new SocketAddress() { }).syncUninterruptibly();
pipeline.connect(new SocketAddress() { }).syncUninterruptibly();
pipeline.disconnect().syncUninterruptibly();
pipeline.close().syncUninterruptibly();
pipeline.write("");
pipeline.flush();
pipeline.read();
pipeline.remove(outboundCalledHandler);
pipeline.remove(inboundCalledHandler);
pipeline.remove(skipHandler);
assertFalse(channel.finish());
outboundCalledHandler.assertCalled();
inboundCalledHandler.assertCalled();
skipHandler.assertSkipped();
}
@Test(timeout = 5000)
public void handlerAddedStateUpdatedBeforeHandlerAddedDoneForceEventLoop() throws InterruptedException {
handlerAddedStateUpdatedBeforeHandlerAddedDone(true);