Fix an event ordering issue

Motivation:

In the early days of 5.0, we merged ChannelInboundHandler and
ChannelOutboundHandler into ChannelHandler, and introduced the
annotation called 'Skip'.  The annotation 'Skip' was introduced to
determine which handler methods are no-op (i.e. simply forwarding the
event to the next handler) so that DefaultChannelHandlerContext doesn't
even need to submit an event-invoking task to an EventExecutor,
significantly reducing the context switches.

However, this introduced a regression for the handlers which implemented
write() but not flush(). Because flush() was skippable for such
handlers, flush() event went through to the next handler before write() does.

To address this problem, we came up with a naive workaround that sets
MASK_FLUSH when MASK_WRITE is set.

Although the previous workaround works fine for many cases, we still
seem to have an event ordering problem.  We keep seeing the intermittant
failure of LocalTransportThreadModelTest.testStagedExecution(), because
other handler methods are still skipped.

Modifications:

We do not skip the execution of handler methods annotated with 'Skip'
unless all inbound methods (or all outbound methods) are marked with
'Skip'.

Result:

This change Brings back the event ordering behavior of 4.x, making
LocalTransportThreadModelTest.testStagedExecution() pass.
This commit is contained in:
Trustin Lee 2014-05-22 11:14:36 +09:00
parent d803a75e2f
commit 00853d9453

View File

@ -36,6 +36,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
static final int MASK_HANDLER_ADDED = 1;
static final int MASK_HANDLER_REMOVED = 1 << 1;
private static final int MASK_EXCEPTION_CAUGHT = 1 << 2;
private static final int MASK_CHANNEL_REGISTERED = 1 << 3;
private static final int MASK_CHANNEL_UNREGISTERED = 1 << 4;
@ -45,6 +46,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
private static final int MASK_CHANNEL_READ_COMPLETE = 1 << 8;
private static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 9;
private static final int MASK_USER_EVENT_TRIGGERED = 1 << 10;
private static final int MASK_BIND = 1 << 11;
private static final int MASK_CONNECT = 1 << 12;
private static final int MASK_DISCONNECT = 1 << 13;
@ -54,6 +56,25 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
private static final int MASK_WRITE = 1 << 17;
private static final int MASK_FLUSH = 1 << 18;
private static final int MASKGROUP_INBOUND = MASK_EXCEPTION_CAUGHT |
MASK_CHANNEL_REGISTERED |
MASK_CHANNEL_UNREGISTERED |
MASK_CHANNEL_ACTIVE |
MASK_CHANNEL_INACTIVE |
MASK_CHANNEL_READ |
MASK_CHANNEL_READ_COMPLETE |
MASK_CHANNEL_WRITABILITY_CHANGED |
MASK_USER_EVENT_TRIGGERED;
private static final int MASKGROUP_OUTBOUND = MASK_BIND |
MASK_CONNECT |
MASK_DISCONNECT |
MASK_CLOSE |
MASK_DEREGISTER |
MASK_READ |
MASK_WRITE |
MASK_FLUSH;
/**
* Cache the result of the costly generation of {@link #skipFlags} in the partitioned synchronized
* {@link WeakHashMap}.
@ -97,87 +118,62 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
private static int skipFlags0(Class<? extends ChannelHandler> handlerType) {
int flags = 0;
try {
if (handlerType.getMethod(
"handlerAdded", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
if (isSkippable(handlerType, "handlerAdded")) {
flags |= MASK_HANDLER_ADDED;
}
if (handlerType.getMethod(
"handlerRemoved", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
if (isSkippable(handlerType, "handlerRemoved")) {
flags |= MASK_HANDLER_REMOVED;
}
if (handlerType.getMethod(
"exceptionCaught", ChannelHandlerContext.class, Throwable.class).isAnnotationPresent(Skip.class)) {
if (isSkippable(handlerType, "exceptionCaught", Throwable.class)) {
flags |= MASK_EXCEPTION_CAUGHT;
}
if (handlerType.getMethod(
"channelRegistered", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
if (isSkippable(handlerType, "channelRegistered")) {
flags |= MASK_CHANNEL_REGISTERED;
}
if (handlerType.getMethod(
"channelUnregistered", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
if (isSkippable(handlerType, "channelUnregistered")) {
flags |= MASK_CHANNEL_UNREGISTERED;
}
if (handlerType.getMethod(
"channelActive", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
if (isSkippable(handlerType, "channelActive")) {
flags |= MASK_CHANNEL_ACTIVE;
}
if (handlerType.getMethod(
"channelInactive", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
if (isSkippable(handlerType, "channelInactive")) {
flags |= MASK_CHANNEL_INACTIVE;
}
if (handlerType.getMethod(
"channelRead", ChannelHandlerContext.class, Object.class).isAnnotationPresent(Skip.class)) {
if (isSkippable(handlerType, "channelRead", Object.class)) {
flags |= MASK_CHANNEL_READ;
}
if (handlerType.getMethod(
"channelReadComplete", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
if (isSkippable(handlerType, "channelReadComplete")) {
flags |= MASK_CHANNEL_READ_COMPLETE;
}
if (handlerType.getMethod(
"channelWritabilityChanged", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
if (isSkippable(handlerType, "channelWritabilityChanged")) {
flags |= MASK_CHANNEL_WRITABILITY_CHANGED;
}
if (handlerType.getMethod(
"userEventTriggered", ChannelHandlerContext.class, Object.class).isAnnotationPresent(Skip.class)) {
if (isSkippable(handlerType, "userEventTriggered", Object.class)) {
flags |= MASK_USER_EVENT_TRIGGERED;
}
if (handlerType.getMethod(
"bind", ChannelHandlerContext.class,
SocketAddress.class, ChannelPromise.class).isAnnotationPresent(Skip.class)) {
if (isSkippable(handlerType, "bind", SocketAddress.class, ChannelPromise.class)) {
flags |= MASK_BIND;
}
if (handlerType.getMethod(
"connect", ChannelHandlerContext.class, SocketAddress.class, SocketAddress.class,
ChannelPromise.class).isAnnotationPresent(Skip.class)) {
if (isSkippable(handlerType, "connect", SocketAddress.class, SocketAddress.class, ChannelPromise.class)) {
flags |= MASK_CONNECT;
}
if (handlerType.getMethod(
"disconnect", ChannelHandlerContext.class, ChannelPromise.class).isAnnotationPresent(Skip.class)) {
if (isSkippable(handlerType, "disconnect", ChannelPromise.class)) {
flags |= MASK_DISCONNECT;
}
if (handlerType.getMethod(
"close", ChannelHandlerContext.class, ChannelPromise.class).isAnnotationPresent(Skip.class)) {
if (isSkippable(handlerType, "close", ChannelPromise.class)) {
flags |= MASK_CLOSE;
}
if (handlerType.getMethod(
"deregister", ChannelHandlerContext.class, ChannelPromise.class).isAnnotationPresent(Skip.class)) {
if (isSkippable(handlerType, "deregister", ChannelPromise.class)) {
flags |= MASK_DEREGISTER;
}
if (handlerType.getMethod(
"read", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
if (isSkippable(handlerType, "read")) {
flags |= MASK_READ;
}
if (handlerType.getMethod(
"write", ChannelHandlerContext.class,
Object.class, ChannelPromise.class).isAnnotationPresent(Skip.class)) {
if (isSkippable(handlerType, "write", Object.class, ChannelPromise.class)) {
flags |= MASK_WRITE;
// flush() is skipped only when write() is also skipped to avoid the situation where
// flush() is handled by the event loop before write() in staged execution.
if (handlerType.getMethod(
"flush", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
flags |= MASK_FLUSH;
}
}
if (isSkippable(handlerType, "flush")) {
flags |= MASK_FLUSH;
}
} catch (Exception e) {
// Should never reach here.
@ -187,6 +183,17 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
return flags;
}
@SuppressWarnings("rawtypes")
private static boolean isSkippable(
Class<?> handlerType, String methodName, Class<?>... paramTypes) throws Exception {
Class[] newParamTypes = new Class[paramTypes.length + 1];
newParamTypes[0] = ChannelHandlerContext.class;
System.arraycopy(paramTypes, 0, newParamTypes, 1, paramTypes.length);
return handlerType.getMethod(methodName, newParamTypes).isAnnotationPresent(Skip.class);
}
volatile DefaultChannelHandlerContext next;
volatile DefaultChannelHandlerContext prev;
@ -305,49 +312,49 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
@Override
public ChannelHandlerContext fireChannelRegistered() {
DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_REGISTERED);
DefaultChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelRegistered(next);
return this;
}
@Override
public ChannelHandlerContext fireChannelUnregistered() {
DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_UNREGISTERED);
DefaultChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelUnregistered(next);
return this;
}
@Override
public ChannelHandlerContext fireChannelActive() {
DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_ACTIVE);
DefaultChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelActive(next);
return this;
}
@Override
public ChannelHandlerContext fireChannelInactive() {
DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_INACTIVE);
DefaultChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelInactive(next);
return this;
}
@Override
public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
DefaultChannelHandlerContext next = findContextInbound(MASK_EXCEPTION_CAUGHT);
DefaultChannelHandlerContext next = findContextInbound();
next.invoker().invokeExceptionCaught(next, cause);
return this;
}
@Override
public ChannelHandlerContext fireUserEventTriggered(Object event) {
DefaultChannelHandlerContext next = findContextInbound(MASK_USER_EVENT_TRIGGERED);
DefaultChannelHandlerContext next = findContextInbound();
next.invoker().invokeUserEventTriggered(next, event);
return this;
}
@Override
public ChannelHandlerContext fireChannelRead(Object msg) {
DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ);
DefaultChannelHandlerContext next = findContextInbound();
ReferenceCountUtil.touch(msg, next);
next.invoker().invokeChannelRead(next, msg);
return this;
@ -355,14 +362,14 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
@Override
public ChannelHandlerContext fireChannelReadComplete() {
DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ_COMPLETE);
DefaultChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelReadComplete(next);
return this;
}
@Override
public ChannelHandlerContext fireChannelWritabilityChanged() {
DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED);
DefaultChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelWritabilityChanged(next);
return this;
}
@ -399,7 +406,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
DefaultChannelHandlerContext next = findContextOutbound(MASK_BIND);
DefaultChannelHandlerContext next = findContextOutbound();
next.invoker().invokeBind(next, localAddress, promise);
return promise;
}
@ -411,7 +418,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
DefaultChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
DefaultChannelHandlerContext next = findContextOutbound();
next.invoker().invokeConnect(next, remoteAddress, localAddress, promise);
return promise;
}
@ -422,28 +429,28 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
return close(promise);
}
DefaultChannelHandlerContext next = findContextOutbound(MASK_DISCONNECT);
DefaultChannelHandlerContext next = findContextOutbound();
next.invoker().invokeDisconnect(next, promise);
return promise;
}
@Override
public ChannelFuture close(ChannelPromise promise) {
DefaultChannelHandlerContext next = findContextOutbound(MASK_CLOSE);
DefaultChannelHandlerContext next = findContextOutbound();
next.invoker().invokeClose(next, promise);
return promise;
}
@Override
public ChannelFuture deregister(ChannelPromise promise) {
DefaultChannelHandlerContext next = findContextOutbound(MASK_DEREGISTER);
DefaultChannelHandlerContext next = findContextOutbound();
next.invoker().invokeDeregister(next, promise);
return promise;
}
@Override
public ChannelHandlerContext read() {
DefaultChannelHandlerContext next = findContextOutbound(MASK_READ);
DefaultChannelHandlerContext next = findContextOutbound();
next.invoker().invokeRead(next);
return this;
}
@ -455,7 +462,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
@Override
public ChannelFuture write(Object msg, ChannelPromise promise) {
DefaultChannelHandlerContext next = findContextOutbound(MASK_WRITE);
DefaultChannelHandlerContext next = findContextOutbound();
ReferenceCountUtil.touch(msg, next);
next.invoker().invokeWrite(next, msg, promise);
return promise;
@ -463,7 +470,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
@Override
public ChannelHandlerContext flush() {
DefaultChannelHandlerContext next = findContextOutbound(MASK_FLUSH);
DefaultChannelHandlerContext next = findContextOutbound();
next.invoker().invokeFlush(next);
return this;
}
@ -471,10 +478,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
DefaultChannelHandlerContext next;
next = findContextOutbound(MASK_WRITE);
next = findContextOutbound();
ReferenceCountUtil.touch(msg, next);
next.invoker().invokeWrite(next, msg, promise);
next = findContextOutbound(MASK_FLUSH);
next = findContextOutbound();
next.invoker().invokeFlush(next);
return promise;
}
@ -508,19 +515,19 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
return new FailedChannelFuture(channel(), executor(), cause);
}
private DefaultChannelHandlerContext findContextInbound(int mask) {
private DefaultChannelHandlerContext findContextInbound() {
DefaultChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while ((ctx.skipFlags & mask) != 0);
} while ((ctx.skipFlags & MASKGROUP_INBOUND) != 0);
return ctx;
}
private DefaultChannelHandlerContext findContextOutbound(int mask) {
private DefaultChannelHandlerContext findContextOutbound() {
DefaultChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while ((ctx.skipFlags & mask) != 0);
} while ((ctx.skipFlags & MASKGROUP_OUTBOUND) != 0);
return ctx;
}