User-definable thread model via ChannelHandlerInvoker

Motivation:

While the default thread model provided by Netty is reasonable enough for most applications, some users might have a special requirement for the thread model.  Here are a few examples:

- A user might want to invoke handlers from the caller thread directly, assuming that his or her application is completely asynchronous and does not make any invocation from non-I/O thread.  In this case, the default invoker implementation will only add the overhead of checking if the current thread is an I/O thread or not.
- A user might want to invoke handlers from different threads depending on the type of events flexibly.

Modifications:

- Backport 132af3a485 which is a fix for #1912
  - Add a new interface called 'ChannelHandlerInvoker' that performs the invocation of event handler methods.
  - Add pipeline manipulation methods that accept ChannelHandlerInvoker
- The differences from the original commit:
  - Separated the irrelevant changes out
  - Channel.eventLoop is null until the registration is complete in this branch, so Channel.Unsafe.invoker() doesn't work before registration.
  - Deregistration is not gone in this branch, so the methods related with deregistration were added to ChannelHandlerInvoker
This commit is contained in:
Trustin Lee 2014-03-24 18:09:27 +09:00
parent 6d4c4d9e4b
commit 844362a947
14 changed files with 1314 additions and 725 deletions

View File

@ -15,6 +15,7 @@
*/ */
package io.netty.channel.epoll; package io.netty.channel.epoll;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutor;
@ -68,7 +69,7 @@ public final class EpollEventLoopGroup extends MultithreadEventLoopGroup {
} }
@Override @Override
protected EventExecutor newChild(Executor executor, Object... args) throws Exception { protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new EpollEventLoop(this, executor, (Integer) args[0]); return new EpollEventLoop(this, executor, (Integer) args[0]);
} }
} }

View File

@ -387,6 +387,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private boolean inFlush0; private boolean inFlush0;
@Override
public final ChannelHandlerInvoker invoker() {
return eventLoop().asInvoker();
}
@Override @Override
public final ChannelOutboundBuffer outboundBuffer() { public final ChannelOutboundBuffer outboundBuffer() {
return outboundBuffer; return outboundBuffer;
@ -673,6 +678,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
flush0(); flush0();
} }
@SuppressWarnings("deprecation")
protected void flush0() { protected void flush0() {
if (inFlush0) { if (inFlush0) {
// Avoid re-entrance // Avoid re-entrance

View File

@ -431,6 +431,7 @@ public interface Channel extends AttributeMap, Comparable<Channel> {
* are only provided to implement the actual transport, and must be invoked from an I/O thread except for the * are only provided to implement the actual transport, and must be invoked from an I/O thread except for the
* following methods: * following methods:
* <ul> * <ul>
* <li>{@link #invoker()}</li>
* <li>{@link #localAddress()}</li> * <li>{@link #localAddress()}</li>
* <li>{@link #remoteAddress()}</li> * <li>{@link #remoteAddress()}</li>
* <li>{@link #closeForcibly()}</li> * <li>{@link #closeForcibly()}</li>
@ -439,6 +440,12 @@ public interface Channel extends AttributeMap, Comparable<Channel> {
* </ul> * </ul>
*/ */
interface Unsafe { interface Unsafe {
/**
* Returns the {@link ChannelHandlerInvoker} which is used by default unless specified by a user.
*/
ChannelHandlerInvoker invoker();
/** /**
* Return the {@link SocketAddress} to which is bound local or * Return the {@link SocketAddress} to which is bound local or
* {@code null} if none. * {@code null} if none.

View File

@ -192,7 +192,7 @@ public class ChannelHandlerAppender extends ChannelInboundHandlerAdapter {
} else { } else {
name = e.name; name = e.name;
} }
pipeline.addAfter(dctx.executor, oldName, name, e.handler); pipeline.addAfter(dctx.invoker, oldName, name, e.handler);
} }
} finally { } finally {
if (selfRemoval) { if (selfRemoval) {

View File

@ -0,0 +1,163 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.concurrent.EventExecutor;
import java.net.SocketAddress;
/**
* Invokes the event handler methods of {@link ChannelInboundHandler} and {@link ChannelOutboundHandler}.
* A user can specify a {@link ChannelHandlerInvoker} to implement a custom thread model unsupported by the default
* implementation.
*/
public interface ChannelHandlerInvoker {
/**
* Returns the {@link EventExecutor} which is used to execute an arbitrary task.
*/
EventExecutor executor();
/**
* Invokes {@link ChannelInboundHandler#channelRegistered(ChannelHandlerContext)}. This method is not for a user
* but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
* {@link ChannelHandlerContext} instead.
*/
void invokeChannelRegistered(ChannelHandlerContext ctx);
/**
* Invokes {@link ChannelInboundHandler#channelUnregistered(ChannelHandlerContext)}. This method is not for a user
* but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
* {@link ChannelHandlerContext} instead.
*/
void invokeChannelUnregistered(ChannelHandlerContext ctx);
/**
* Invokes {@link ChannelInboundHandler#channelActive(ChannelHandlerContext)}. This method is not for a user
* but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
* {@link ChannelHandlerContext} instead.
*/
void invokeChannelActive(ChannelHandlerContext ctx);
/**
* Invokes {@link ChannelInboundHandler#channelInactive(ChannelHandlerContext)}. This method is not for a user
* but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
* {@link ChannelHandlerContext} instead.
*/
void invokeChannelInactive(ChannelHandlerContext ctx);
/**
* Invokes {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}. This method is not for a user
* but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
* {@link ChannelHandlerContext} instead.
*/
void invokeExceptionCaught(ChannelHandlerContext ctx, Throwable cause);
/**
* Invokes {@link ChannelInboundHandler#userEventTriggered(ChannelHandlerContext, Object)}. This method is not for
* a user but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
* {@link ChannelHandlerContext} instead.
*/
void invokeUserEventTriggered(ChannelHandlerContext ctx, Object event);
/**
* Invokes {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is not for a user
* but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
* {@link ChannelHandlerContext} instead.
*/
void invokeChannelRead(ChannelHandlerContext ctx, Object msg);
/**
* Invokes {@link ChannelInboundHandler#channelReadComplete(ChannelHandlerContext)}. This method is not for a user
* but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
* {@link ChannelHandlerContext} instead.
*/
void invokeChannelReadComplete(ChannelHandlerContext ctx);
/**
* Invokes {@link ChannelInboundHandler#channelWritabilityChanged(ChannelHandlerContext)}. This method is not for
* a user but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
* {@link ChannelHandlerContext} instead.
*/
void invokeChannelWritabilityChanged(ChannelHandlerContext ctx);
/**
* Invokes {@link ChannelOutboundHandler#bind(ChannelHandlerContext, SocketAddress, ChannelPromise)}.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.
* To trigger an event, use the methods in {@link ChannelHandlerContext} instead.
*/
void invokeBind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise);
/**
* Invokes
* {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)}.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.
* To trigger an event, use the methods in {@link ChannelHandlerContext} instead.
*/
void invokeConnect(
ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
/**
* Invokes {@link ChannelOutboundHandler#disconnect(ChannelHandlerContext, ChannelPromise)}.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.
* To trigger an event, use the methods in {@link ChannelHandlerContext} instead.
*/
void invokeDisconnect(ChannelHandlerContext ctx, ChannelPromise promise);
/**
* Invokes {@link ChannelOutboundHandler#close(ChannelHandlerContext, ChannelPromise)}.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.
* To trigger an event, use the methods in {@link ChannelHandlerContext} instead.
*/
void invokeClose(ChannelHandlerContext ctx, ChannelPromise promise);
/**
* Invokes {@link ChannelOutboundHandler#deregister(ChannelHandlerContext, ChannelPromise)}.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.
* To trigger an event, use the methods in {@link ChannelHandlerContext} instead.
*/
void invokeDeregister(ChannelHandlerContext ctx, ChannelPromise promise);
/**
* Invokes {@link ChannelOutboundHandler#read(ChannelHandlerContext)}.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.
* To trigger an event, use the methods in {@link ChannelHandlerContext} instead.
*/
void invokeRead(ChannelHandlerContext ctx);
/**
* Invokes {@link ChannelOutboundHandler#write(ChannelHandlerContext, Object, ChannelPromise)}.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.
* To trigger an event, use the methods in {@link ChannelHandlerContext} instead.
*/
void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise);
/**
* Invokes {@link ChannelOutboundHandler#write(ChannelHandlerContext, Object, ChannelPromise)} and
* {@link ChannelOutboundHandler#flush(ChannelHandlerContext)} sequentially.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.
* To trigger an event, use the methods in {@link ChannelHandlerContext} instead.
*/
void invokeWriteAndFlush(ChannelHandlerContext ctx, Object msg, ChannelPromise promise);
/**
* Invokes {@link ChannelOutboundHandler#flush(ChannelHandlerContext)}.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.
* To trigger an event, use the methods in {@link ChannelHandlerContext} instead.
*/
void invokeFlush(ChannelHandlerContext ctx);
}

View File

@ -0,0 +1,225 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import java.net.SocketAddress;
import static io.netty.channel.DefaultChannelPipeline.*;
/**
* A set of helper methods for easier implementation of custom {@link ChannelHandlerInvoker} implementation.
*/
public final class ChannelHandlerInvokerUtil {
public static void invokeChannelRegisteredNow(ChannelHandlerContext ctx) {
try {
((ChannelInboundHandler) ctx.handler()).channelRegistered(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
@SuppressWarnings("deprecation")
public static void invokeChannelUnregisteredNow(ChannelHandlerContext ctx) {
try {
((ChannelInboundHandler) ctx.handler()).channelUnregistered(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static void invokeChannelActiveNow(final ChannelHandlerContext ctx) {
try {
((ChannelInboundHandler) ctx.handler()).channelActive(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static void invokeChannelInactiveNow(final ChannelHandlerContext ctx) {
try {
((ChannelInboundHandler) ctx.handler()).channelInactive(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
@SuppressWarnings("deprecation")
public static void invokeExceptionCaughtNow(final ChannelHandlerContext ctx, final Throwable cause) {
try {
ctx.handler().exceptionCaught(ctx, cause);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by a user handler's exceptionCaught() method:", t);
logger.warn(".. and the cause of the exceptionCaught() was:", cause);
}
}
}
public static void invokeUserEventTriggeredNow(final ChannelHandlerContext ctx, final Object event) {
try {
((ChannelInboundHandler) ctx.handler()).userEventTriggered(ctx, event);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static void invokeChannelReadNow(final ChannelHandlerContext ctx, final Object msg) {
try {
((ChannelInboundHandler) ctx.handler()).channelRead(ctx, msg);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static void invokeChannelReadCompleteNow(final ChannelHandlerContext ctx) {
try {
((ChannelInboundHandler) ctx.handler()).channelReadComplete(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static void invokeChannelWritabilityChangedNow(final ChannelHandlerContext ctx) {
try {
((ChannelInboundHandler) ctx.handler()).channelWritabilityChanged(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static void invokeBindNow(
final ChannelHandlerContext ctx, final SocketAddress localAddress, final ChannelPromise promise) {
try {
((ChannelOutboundHandler) ctx.handler()).bind(ctx, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
public static void invokeConnectNow(
final ChannelHandlerContext ctx,
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
try {
((ChannelOutboundHandler) ctx.handler()).connect(ctx, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
public static void invokeDisconnectNow(final ChannelHandlerContext ctx, final ChannelPromise promise) {
try {
((ChannelOutboundHandler) ctx.handler()).disconnect(ctx, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
public static void invokeCloseNow(final ChannelHandlerContext ctx, final ChannelPromise promise) {
try {
((ChannelOutboundHandler) ctx.handler()).close(ctx, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
@SuppressWarnings("deprecation")
public static void invokeDeregisterNow(final ChannelHandlerContext ctx, final ChannelPromise promise) {
try {
((ChannelOutboundHandler) ctx.handler()).deregister(ctx, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
public static void invokeReadNow(final ChannelHandlerContext ctx) {
try {
((ChannelOutboundHandler) ctx.handler()).read(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static void invokeWriteNow(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) ctx.handler()).write(ctx, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
public static void invokeFlushNow(final ChannelHandlerContext ctx) {
try {
((ChannelOutboundHandler) ctx.handler()).flush(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static void invokeWriteAndFlushNow(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
invokeWriteNow(ctx, msg, promise);
invokeFlushNow(ctx);
}
private static void notifyHandlerException(ChannelHandlerContext ctx, Throwable cause) {
if (inExceptionCaught(cause)) {
if (logger.isWarnEnabled()) {
logger.warn(
"An exception was thrown by a user handler " +
"while handling an exceptionCaught event", cause);
}
return;
}
invokeExceptionCaughtNow(ctx, cause);
}
private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
// only try to fail the promise if its not a VoidChannelPromise, as
// the VoidChannelPromise would also fire the cause through the pipeline
if (promise instanceof VoidChannelPromise) {
return;
}
if (!promise.tryFailure(cause)) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to fail the promise because it's done already: {}", promise, cause);
}
}
}
private static boolean inExceptionCaught(Throwable cause) {
do {
StackTraceElement[] trace = cause.getStackTrace();
if (trace != null) {
for (StackTraceElement t : trace) {
if (t == null) {
break;
}
if ("exceptionCaught".equals(t.getMethodName())) {
return true;
}
}
}
cause = cause.getCause();
} while (cause != null);
return false;
}
private ChannelHandlerInvokerUtil() { }
}

View File

@ -215,8 +215,7 @@ import java.util.NoSuchElementException;
* For example, you can insert an encryption handler when sensitive information is about to be exchanged, and remove it * For example, you can insert an encryption handler when sensitive information is about to be exchanged, and remove it
* after the exchange. * after the exchange.
*/ */
public interface ChannelPipeline public interface ChannelPipeline extends Iterable<Entry<String, ChannelHandler>> {
extends Iterable<Entry<String, ChannelHandler>> {
/** /**
* Inserts a {@link ChannelHandler} at the first position of this pipeline. * Inserts a {@link ChannelHandler} at the first position of this pipeline.
@ -246,6 +245,20 @@ public interface ChannelPipeline
*/ */
ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler); ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler} at the first position of this pipeline.
*
* @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods
* @param name the name of the handler to insert first
* @param handler the handler to insert first
*
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified name or handler is {@code null}
*/
ChannelPipeline addFirst(ChannelHandlerInvoker invoker, String name, ChannelHandler handler);
/** /**
* Appends a {@link ChannelHandler} at the last position of this pipeline. * Appends a {@link ChannelHandler} at the last position of this pipeline.
* *
@ -274,6 +287,20 @@ public interface ChannelPipeline
*/ */
ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler); ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
/**
* Appends a {@link ChannelHandler} at the last position of this pipeline.
*
* @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods
* @param name the name of the handler to append
* @param handler the handler to append
*
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified name or handler is {@code null}
*/
ChannelPipeline addLast(ChannelHandlerInvoker invoker, String name, ChannelHandler handler);
/** /**
* Inserts a {@link ChannelHandler} before an existing handler of this * Inserts a {@link ChannelHandler} before an existing handler of this
* pipeline. * pipeline.
@ -310,6 +337,24 @@ public interface ChannelPipeline
*/ */
ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler); ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler} before an existing handler of this
* pipeline.
*
* @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods
* @param baseName the name of the existing handler
* @param name the name of the handler to insert before
* @param handler the handler to insert before
*
* @throws NoSuchElementException
* if there's no such entry with the specified {@code baseName}
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified baseName, name, or handler is {@code null}
*/
ChannelPipeline addBefore(ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler);
/** /**
* Inserts a {@link ChannelHandler} after an existing handler of this * Inserts a {@link ChannelHandler} after an existing handler of this
* pipeline. * pipeline.
@ -346,6 +391,24 @@ public interface ChannelPipeline
*/ */
ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler); ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler} after an existing handler of this
* pipeline.
*
* @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods
* @param baseName the name of the existing handler
* @param name the name of the handler to insert after
* @param handler the handler to insert after
*
* @throws NoSuchElementException
* if there's no such entry with the specified {@code baseName}
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified baseName, name, or handler is {@code null}
*/
ChannelPipeline addAfter(ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler);
/** /**
* Inserts a {@link ChannelHandler}s at the first position of this pipeline. * Inserts a {@link ChannelHandler}s at the first position of this pipeline.
* *
@ -364,6 +427,15 @@ public interface ChannelPipeline
*/ */
ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers); ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);
/**
* Inserts a {@link ChannelHandler}s at the first position of this pipeline.
*
* @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods
* @param handlers the handlers to insert first
*
*/
ChannelPipeline addFirst(ChannelHandlerInvoker invoker, ChannelHandler... handlers);
/** /**
* Inserts a {@link ChannelHandler}s at the last position of this pipeline. * Inserts a {@link ChannelHandler}s at the last position of this pipeline.
* *
@ -382,6 +454,15 @@ public interface ChannelPipeline
*/ */
ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers); ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
/**
* Inserts a {@link ChannelHandler}s at the last position of this pipeline.
*
* @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods
* @param handlers the handlers to insert last
*
*/
ChannelPipeline addLast(ChannelHandlerInvoker invoker, ChannelHandler... handlers);
/** /**
* Removes the specified {@link ChannelHandler} from this pipeline. * Removes the specified {@link ChannelHandler} from this pipeline.
* *

View File

@ -18,18 +18,13 @@ package io.netty.channel;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.util.Attribute; import io.netty.util.Attribute;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.ResourceLeakHint; import io.netty.util.ResourceLeakHint;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import java.net.SocketAddress; import java.net.SocketAddress;
import static io.netty.channel.DefaultChannelPipeline.*;
final class DefaultChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint { final class DefaultChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
volatile DefaultChannelHandlerContext next; volatile DefaultChannelHandlerContext next;
@ -43,21 +38,19 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
private final ChannelHandler handler; private final ChannelHandler handler;
private boolean removed; private boolean removed;
// Will be set to null if no child executor should be used, otherwise it will be set to the final ChannelHandlerInvoker invoker;
// child executor.
final EventExecutor executor;
private ChannelFuture succeededFuture; private ChannelFuture succeededFuture;
// Lazily instantiated tasks used to trigger events to a handler with different executor. // Lazily instantiated tasks used to trigger events to a handler with different executor.
// These needs to be volatile as otherwise an other Thread may see an half initialized instance. // These needs to be volatile as otherwise an other Thread may see an half initialized instance.
// See the JMM for more details // See the JMM for more details
private volatile Runnable invokeChannelReadCompleteTask; volatile Runnable invokeChannelReadCompleteTask;
private volatile Runnable invokeReadTask; volatile Runnable invokeReadTask;
private volatile Runnable invokeChannelWritableStateChangedTask; volatile Runnable invokeChannelWritableStateChangedTask;
private volatile Runnable invokeFlushTask; volatile Runnable invokeFlushTask;
DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutorGroup group, String name, DefaultChannelHandlerContext(
ChannelHandler handler) { DefaultChannelPipeline pipeline, ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
if (name == null) { if (name == null) {
throw new NullPointerException("name"); throw new NullPointerException("name");
@ -70,19 +63,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
this.pipeline = pipeline; this.pipeline = pipeline;
this.name = name; this.name = name;
this.handler = handler; this.handler = handler;
this.invoker = invoker;
if (group != null) {
// Pin one of the child executors once and remember it so that the same child executor
// is used to fire events for the same channel.
EventExecutor childExecutor = pipeline.childExecutors.get(group);
if (childExecutor == null) {
childExecutor = group.next();
pipeline.childExecutors.put(group, childExecutor);
}
executor = childExecutor;
} else {
executor = null;
}
inbound = handler instanceof ChannelInboundHandler; inbound = handler instanceof ChannelInboundHandler;
outbound = handler instanceof ChannelOutboundHandler; outbound = handler instanceof ChannelOutboundHandler;
@ -130,10 +111,14 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
@Override @Override
public EventExecutor executor() { public EventExecutor executor() {
if (executor == null) { return invoker().executor();
return channel().eventLoop(); }
public ChannelHandlerInvoker invoker() {
if (invoker == null) {
return channel.unsafe().invoker();
} else { } else {
return executor; return invoker;
} }
} }
@ -154,265 +139,68 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
@Override @Override
public ChannelHandlerContext fireChannelRegistered() { public ChannelHandlerContext fireChannelRegistered() {
final DefaultChannelHandlerContext next = findContextInbound(); DefaultChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor(); next.invoker().invokeChannelRegistered(next);
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
return this; return this;
} }
private void invokeChannelRegistered() {
try {
((ChannelInboundHandler) handler).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override @Override
public ChannelHandlerContext fireChannelUnregistered() { public ChannelHandlerContext fireChannelUnregistered() {
final DefaultChannelHandlerContext next = findContextInbound(); DefaultChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor(); next.invoker().invokeChannelUnregistered(next);
if (executor.inEventLoop()) {
next.invokeChannelUnregistered();
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelUnregistered();
}
});
}
return this; return this;
} }
@SuppressWarnings("deprecation")
private void invokeChannelUnregistered() {
try {
((ChannelInboundHandler) handler).channelUnregistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override @Override
public ChannelHandlerContext fireChannelActive() { public ChannelHandlerContext fireChannelActive() {
final DefaultChannelHandlerContext next = findContextInbound(); DefaultChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor(); next.invoker().invokeChannelActive(next);
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
return this; return this;
} }
private void invokeChannelActive() {
try {
((ChannelInboundHandler) handler).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override @Override
public ChannelHandlerContext fireChannelInactive() { public ChannelHandlerContext fireChannelInactive() {
final DefaultChannelHandlerContext next = findContextInbound(); DefaultChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor(); next.invoker().invokeChannelInactive(next);
if (executor.inEventLoop()) {
next.invokeChannelInactive();
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelInactive();
}
});
}
return this; return this;
} }
private void invokeChannelInactive() {
try {
((ChannelInboundHandler) handler).channelInactive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override @Override
public ChannelHandlerContext fireExceptionCaught(final Throwable cause) { public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
if (cause == null) { DefaultChannelHandlerContext next = this.next;
throw new NullPointerException("cause"); next.invoker().invokeExceptionCaught(next, cause);
}
final DefaultChannelHandlerContext next = this.next;
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeExceptionCaught(cause);
} else {
try {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeExceptionCaught(cause);
}
});
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to submit an exceptionCaught() event.", t);
logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
}
}
}
return this; return this;
} }
@SuppressWarnings("deprecation")
private void invokeExceptionCaught(final Throwable cause) {
try {
handler.exceptionCaught(this, cause);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn(
"An exception was thrown by a user handler's " +
"exceptionCaught() method while handling the following exception:", cause);
}
}
}
@Override @Override
public ChannelHandlerContext fireUserEventTriggered(final Object event) { public ChannelHandlerContext fireUserEventTriggered(Object event) {
if (event == null) { DefaultChannelHandlerContext next = findContextInbound();
throw new NullPointerException("event"); next.invoker().invokeUserEventTriggered(next, event);
}
final DefaultChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeUserEventTriggered(event);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeUserEventTriggered(event);
}
});
}
return this; return this;
} }
private void invokeUserEventTriggered(Object event) {
try {
((ChannelInboundHandler) handler).userEventTriggered(this, event);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override @Override
public ChannelHandlerContext fireChannelRead(final Object msg) { public ChannelHandlerContext fireChannelRead(Object msg) {
if (msg == null) { DefaultChannelHandlerContext next = findContextInbound();
throw new NullPointerException("msg");
}
final DefaultChannelHandlerContext next = findContextInbound();
ReferenceCountUtil.touch(msg, next); ReferenceCountUtil.touch(msg, next);
EventExecutor executor = next.executor(); next.invoker().invokeChannelRead(next, msg);
if (executor.inEventLoop()) {
next.invokeChannelRead(msg);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelRead(msg);
}
});
}
return this; return this;
} }
private void invokeChannelRead(Object msg) {
try {
((ChannelInboundHandler) handler).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override @Override
public ChannelHandlerContext fireChannelReadComplete() { public ChannelHandlerContext fireChannelReadComplete() {
final DefaultChannelHandlerContext next = findContextInbound(); DefaultChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor(); next.invoker().invokeChannelReadComplete(next);
if (executor.inEventLoop()) {
next.invokeChannelReadComplete();
} else {
Runnable task = next.invokeChannelReadCompleteTask;
if (task == null) {
next.invokeChannelReadCompleteTask = task = new Runnable() {
@Override
public void run() {
next.invokeChannelReadComplete();
}
};
}
executor.execute(task);
}
return this; return this;
} }
private void invokeChannelReadComplete() {
try {
((ChannelInboundHandler) handler).channelReadComplete(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override @Override
public ChannelHandlerContext fireChannelWritabilityChanged() { public ChannelHandlerContext fireChannelWritabilityChanged() {
final DefaultChannelHandlerContext next = findContextInbound(); DefaultChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor(); next.invoker().invokeChannelWritabilityChanged(next);
if (executor.inEventLoop()) {
next.invokeChannelWritabilityChanged();
} else {
Runnable task = next.invokeChannelWritableStateChangedTask;
if (task == null) {
next.invokeChannelWritableStateChangedTask = task = new Runnable() {
@Override
public void run() {
next.invokeChannelWritabilityChanged();
}
};
}
executor.execute(task);
}
return this; return this;
} }
private void invokeChannelWritabilityChanged() {
try {
((ChannelInboundHandler) handler).channelWritabilityChanged(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override @Override
public ChannelFuture bind(SocketAddress localAddress) { public ChannelFuture bind(SocketAddress localAddress) {
return bind(localAddress, newPromise()); return bind(localAddress, newPromise());
@ -445,294 +233,81 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
@Override @Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) { DefaultChannelHandlerContext next = findContextOutbound();
throw new NullPointerException("localAddress"); next.invoker().invokeBind(next, localAddress, promise);
}
validatePromise(promise, false);
final DefaultChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new OneTimeTask() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise; return promise;
} }
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
@Override @Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return connect(remoteAddress, null, promise); return connect(remoteAddress, null, promise);
} }
@Override @Override
public ChannelFuture connect( public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { DefaultChannelHandlerContext next = findContextOutbound();
next.invoker().invokeConnect(next, remoteAddress, localAddress, promise);
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
validatePromise(promise, false);
final DefaultChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, new OneTimeTask() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null);
}
return promise; return promise;
} }
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
@Override @Override
public ChannelFuture disconnect(final ChannelPromise promise) { public ChannelFuture disconnect(ChannelPromise promise) {
validatePromise(promise, false); if (!channel().metadata().hasDisconnect()) {
return close(promise);
final DefaultChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// Translate disconnect to close if the channel has no notion of disconnect-reconnect.
// So far, UDP/IP is the only transport that has such behavior.
if (!channel().metadata().hasDisconnect()) {
next.invokeClose(promise);
} else {
next.invokeDisconnect(promise);
}
} else {
safeExecute(executor, new OneTimeTask() {
@Override
public void run() {
if (!channel().metadata().hasDisconnect()) {
next.invokeClose(promise);
} else {
next.invokeDisconnect(promise);
}
}
}, promise, null);
} }
DefaultChannelHandlerContext next = findContextOutbound();
next.invoker().invokeDisconnect(next, promise);
return promise; return promise;
} }
private void invokeDisconnect(ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler).disconnect(this, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
@Override @Override
public ChannelFuture close(final ChannelPromise promise) { public ChannelFuture close(ChannelPromise promise) {
validatePromise(promise, false); DefaultChannelHandlerContext next = findContextOutbound();
next.invoker().invokeClose(next, promise);
final DefaultChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeClose(promise);
} else {
safeExecute(executor, new OneTimeTask() {
@Override
public void run() {
next.invokeClose(promise);
}
}, promise, null);
}
return promise; return promise;
} }
private void invokeClose(ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler).close(this, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
@Override @Override
public ChannelFuture deregister(final ChannelPromise promise) { public ChannelFuture deregister(ChannelPromise promise) {
validatePromise(promise, false); DefaultChannelHandlerContext next = findContextOutbound();
next.invoker().invokeDeregister(next, promise);
final DefaultChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeDeregister(promise);
} else {
safeExecute(executor, new OneTimeTask() {
@Override
public void run() {
next.invokeDeregister(promise);
}
}, promise, null);
}
return promise; return promise;
} }
@SuppressWarnings("deprecation")
private void invokeDeregister(ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler).deregister(this, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
@Override @Override
public ChannelHandlerContext read() { public ChannelHandlerContext read() {
final DefaultChannelHandlerContext next = findContextOutbound(); DefaultChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor(); next.invoker().invokeRead(next);
if (executor.inEventLoop()) {
next.invokeRead();
} else {
Runnable task = next.invokeReadTask;
if (task == null) {
next.invokeReadTask = task = new Runnable() {
@Override
public void run() {
next.invokeRead();
}
};
}
executor.execute(task);
}
return this; return this;
} }
private void invokeRead() {
try {
((ChannelOutboundHandler) handler).read(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override @Override
public ChannelFuture write(Object msg) { public ChannelFuture write(Object msg) {
return write(msg, newPromise()); return write(msg, newPromise());
} }
@Override @Override
public ChannelFuture write(final Object msg, final ChannelPromise promise) { public ChannelFuture write(Object msg, ChannelPromise promise) {
if (msg == null) { DefaultChannelHandlerContext next = findContextOutbound();
throw new NullPointerException("msg"); ReferenceCountUtil.touch(msg, next);
} next.invoker().invokeWrite(next, msg, promise);
validatePromise(promise, true);
write(msg, false, promise);
return promise; return promise;
} }
private void invokeWrite(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
@Override @Override
public ChannelHandlerContext flush() { public ChannelHandlerContext flush() {
final DefaultChannelHandlerContext next = findContextOutbound(); DefaultChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor(); next.invoker().invokeFlush(next);
if (executor.inEventLoop()) {
next.invokeFlush();
} else {
Runnable task = next.invokeFlushTask;
if (task == null) {
next.invokeFlushTask = task = new Runnable() {
@Override
public void run() {
next.invokeFlush();
}
};
}
safeExecute(executor, task, channel.voidPromise(), null);
}
return this; return this;
} }
private void invokeFlush() {
try {
((ChannelOutboundHandler) handler).flush(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override @Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
validatePromise(promise, true);
write(msg, true, promise);
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
DefaultChannelHandlerContext next = findContextOutbound(); DefaultChannelHandlerContext next = findContextOutbound();
ReferenceCountUtil.touch(msg, next); ReferenceCountUtil.touch(msg, next);
EventExecutor executor = next.executor(); next.invoker().invokeWriteAndFlush(next, msg, promise);
if (executor.inEventLoop()) { return promise;
next.invokeWrite(msg, promise);
if (flush) {
next.invokeFlush();
}
} else {
int size = channel.estimatorHandle().size(msg);
if (size > 0) {
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.incrementPendingOutboundBytes(size);
}
}
Runnable task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, msg, size, promise);
} else {
task = WriteTask.newInstance(next, msg, size, promise);
}
safeExecute(executor, task, promise, msg);
}
} }
@Override @Override
@ -740,53 +315,6 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
return writeAndFlush(msg, newPromise()); return writeAndFlush(msg, newPromise());
} }
private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
// only try to fail the promise if its not a VoidChannelPromise, as
// the VoidChannelPromise would also fire the cause through the pipeline
if (promise instanceof VoidChannelPromise) {
return;
}
if (!promise.tryFailure(cause)) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to fail the promise because it's done already: {}", promise, cause);
}
}
}
private void notifyHandlerException(Throwable cause) {
if (inExceptionCaught(cause)) {
if (logger.isWarnEnabled()) {
logger.warn(
"An exception was thrown by a user handler " +
"while handling an exceptionCaught event", cause);
}
return;
}
invokeExceptionCaught(cause);
}
private static boolean inExceptionCaught(Throwable cause) {
do {
StackTraceElement[] trace = cause.getStackTrace();
if (trace != null) {
for (StackTraceElement t : trace) {
if (t == null) {
break;
}
if ("exceptionCaught".equals(t.getMethodName())) {
return true;
}
}
}
cause = cause.getCause();
} while (cause != null);
return false;
}
@Override @Override
public ChannelPromise newPromise() { public ChannelPromise newPromise() {
return new DefaultChannelPromise(channel(), executor()); return new DefaultChannelPromise(channel(), executor());
@ -811,35 +339,6 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
return new FailedChannelFuture(channel(), executor(), cause); return new FailedChannelFuture(channel(), executor(), cause);
} }
private void validatePromise(ChannelPromise promise, boolean allowVoidPromise) {
if (promise == null) {
throw new NullPointerException("promise");
}
if (promise.isDone()) {
throw new IllegalArgumentException("promise already done: " + promise);
}
if (promise.channel() != channel()) {
throw new IllegalArgumentException(String.format(
"promise.channel does not match: %s (expected: %s)", promise.channel(), channel()));
}
if (promise.getClass() == DefaultChannelPromise.class) {
return;
}
if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
throw new IllegalArgumentException(
StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation");
}
if (promise instanceof AbstractChannel.CloseFuture) {
throw new IllegalArgumentException(
StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
}
}
private DefaultChannelHandlerContext findContextInbound() { private DefaultChannelHandlerContext findContextInbound() {
DefaultChannelHandlerContext ctx = this; DefaultChannelHandlerContext ctx = this;
do { do {
@ -870,126 +369,6 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
return removed; return removed;
} }
private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
try {
executor.execute(runnable);
} catch (Throwable cause) {
try {
promise.setFailure(cause);
} finally {
if (msg != null) {
ReferenceCountUtil.release(msg);
}
}
}
}
abstract static class AbstractWriteTask<T> extends OneTimeTask {
private final Recycler.Handle<T> handle;
private DefaultChannelHandlerContext ctx;
private Object msg;
private ChannelPromise promise;
private int size;
private AbstractWriteTask(Recycler.Handle<T> handle) {
this.handle = handle;
}
protected static void init(AbstractWriteTask<?> task, DefaultChannelHandlerContext ctx,
Object msg, int size, ChannelPromise promise) {
task.ctx = ctx;
task.msg = msg;
task.promise = promise;
task.size = size;
}
@Override
public final void run() {
try {
if (size > 0) {
ChannelOutboundBuffer buffer = ctx.channel.unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.decrementPendingOutboundBytes(size);
}
}
write(ctx, msg, promise);
} finally {
// Set to null so the GC can collect them directly
ctx = null;
msg = null;
promise = null;
recycle(handle);
}
}
protected void write(DefaultChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ctx.invokeWrite(msg, promise);
}
protected abstract void recycle(Recycler.Handle<T> handle);
}
static final class WriteTask
extends AbstractWriteTask<WriteTask> implements SingleThreadEventLoop.NonWakeupRunnable {
private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() {
@Override
protected WriteTask newObject(Handle<WriteTask> handle) {
return new WriteTask(handle);
}
};
private static WriteTask newInstance(
DefaultChannelHandlerContext ctx, Object msg, int size, ChannelPromise promise) {
WriteTask task = RECYCLER.get();
init(task, ctx, msg, size, promise);
return task;
}
private WriteTask(Recycler.Handle<WriteTask> handle) {
super(handle);
}
@Override
protected void recycle(Recycler.Handle<WriteTask> handle) {
RECYCLER.recycle(this, handle);
}
}
static final class WriteAndFlushTask extends AbstractWriteTask<WriteAndFlushTask> {
private static final Recycler<WriteAndFlushTask> RECYCLER = new Recycler<WriteAndFlushTask>() {
@Override
protected WriteAndFlushTask newObject(Handle<WriteAndFlushTask> handle) {
return new WriteAndFlushTask(handle);
}
};
private static WriteAndFlushTask newInstance(
DefaultChannelHandlerContext ctx, Object msg, int size, ChannelPromise promise) {
WriteAndFlushTask task = RECYCLER.get();
init(task, ctx, msg, size, promise);
return task;
}
private WriteAndFlushTask(Recycler.Handle<WriteAndFlushTask> handle) {
super(handle);
}
@Override
public void write(DefaultChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
super.write(ctx, msg, promise);
ctx.invokeFlush();
}
@Override
protected void recycle(Recycler.Handle<WriteAndFlushTask> handle) {
RECYCLER.recycle(this, handle);
}
}
@Override @Override
public String toHintString() { public String toHintString() {
return '\'' + name + "' will handle the message from this point."; return '\'' + name + "' will handle the message from this point.";

View File

@ -0,0 +1,544 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.StringUtil;
import java.net.SocketAddress;
import static io.netty.channel.ChannelHandlerInvokerUtil.*;
import static io.netty.channel.DefaultChannelPipeline.*;
public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
private final EventExecutor executor;
public DefaultChannelHandlerInvoker(EventExecutor executor) {
if (executor == null) {
throw new NullPointerException("executor");
}
this.executor = executor;
}
@Override
public EventExecutor executor() {
return executor;
}
@Override
public void invokeChannelRegistered(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeChannelRegisteredNow(ctx);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
invokeChannelRegisteredNow(ctx);
}
});
}
}
@Override
public void invokeChannelUnregistered(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeChannelUnregisteredNow(ctx);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
invokeChannelUnregisteredNow(ctx);
}
});
}
}
@Override
public void invokeChannelActive(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeChannelActiveNow(ctx);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
invokeChannelActiveNow(ctx);
}
});
}
}
@Override
public void invokeChannelInactive(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeChannelInactiveNow(ctx);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
invokeChannelInactiveNow(ctx);
}
});
}
}
@Override
public void invokeExceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
if (cause == null) {
throw new NullPointerException("cause");
}
if (executor.inEventLoop()) {
invokeExceptionCaughtNow(ctx, cause);
} else {
try {
executor.execute(new Runnable() {
@Override
public void run() {
invokeExceptionCaughtNow(ctx, cause);
}
});
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to submit an exceptionCaught() event.", t);
logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
}
}
}
}
@Override
public void invokeUserEventTriggered(final ChannelHandlerContext ctx, final Object event) {
if (event == null) {
throw new NullPointerException("event");
}
if (executor.inEventLoop()) {
invokeUserEventTriggeredNow(ctx, event);
} else {
safeExecuteInbound(new Runnable() {
@Override
public void run() {
invokeUserEventTriggeredNow(ctx, event);
}
}, event);
}
}
@Override
public void invokeChannelRead(final ChannelHandlerContext ctx, final Object msg) {
if (msg == null) {
throw new NullPointerException("msg");
}
if (executor.inEventLoop()) {
invokeChannelReadNow(ctx, msg);
} else {
safeExecuteInbound(new Runnable() {
@Override
public void run() {
invokeChannelReadNow(ctx, msg);
}
}, msg);
}
}
@Override
public void invokeChannelReadComplete(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeChannelReadCompleteNow(ctx);
} else {
DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx;
Runnable task = dctx.invokeChannelReadCompleteTask;
if (task == null) {
dctx.invokeChannelReadCompleteTask = task = new Runnable() {
@Override
public void run() {
invokeChannelReadCompleteNow(ctx);
}
};
}
executor.execute(task);
}
}
@Override
public void invokeChannelWritabilityChanged(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeChannelWritabilityChangedNow(ctx);
} else {
DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx;
Runnable task = dctx.invokeChannelWritableStateChangedTask;
if (task == null) {
dctx.invokeChannelWritableStateChangedTask = task = new Runnable() {
@Override
public void run() {
invokeChannelWritabilityChangedNow(ctx);
}
};
}
executor.execute(task);
}
}
@Override
public void invokeBind(
final ChannelHandlerContext ctx, final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
validatePromise(ctx, promise, false);
if (executor.inEventLoop()) {
invokeBindNow(ctx, localAddress, promise);
} else {
safeExecuteOutbound(new Runnable() {
@Override
public void run() {
invokeBindNow(ctx, localAddress, promise);
}
}, promise);
}
}
@Override
public void invokeConnect(
final ChannelHandlerContext ctx,
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
validatePromise(ctx, promise, false);
if (executor.inEventLoop()) {
invokeConnectNow(ctx, remoteAddress, localAddress, promise);
} else {
safeExecuteOutbound(new Runnable() {
@Override
public void run() {
invokeConnectNow(ctx, remoteAddress, localAddress, promise);
}
}, promise);
}
}
@Override
public void invokeDisconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) {
validatePromise(ctx, promise, false);
if (executor.inEventLoop()) {
invokeDisconnectNow(ctx, promise);
} else {
safeExecuteOutbound(new Runnable() {
@Override
public void run() {
invokeDisconnectNow(ctx, promise);
}
}, promise);
}
}
@Override
public void invokeClose(final ChannelHandlerContext ctx, final ChannelPromise promise) {
validatePromise(ctx, promise, false);
if (executor.inEventLoop()) {
invokeCloseNow(ctx, promise);
} else {
safeExecuteOutbound(new Runnable() {
@Override
public void run() {
invokeCloseNow(ctx, promise);
}
}, promise);
}
}
@Override
public void invokeDeregister(final ChannelHandlerContext ctx, final ChannelPromise promise) {
validatePromise(ctx, promise, false);
if (executor.inEventLoop()) {
invokeDeregisterNow(ctx, promise);
} else {
safeExecuteOutbound(new Runnable() {
@Override
public void run() {
invokeDeregisterNow(ctx, promise);
}
}, promise);
}
}
@Override
public void invokeRead(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeReadNow(ctx);
} else {
DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx;
Runnable task = dctx.invokeReadTask;
if (task == null) {
dctx.invokeReadTask = task = new Runnable() {
@Override
public void run() {
invokeReadNow(ctx);
}
};
}
executor.execute(task);
}
}
@Override
public void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
validatePromise(ctx, promise, true);
invokeWrite(ctx, msg, false, promise);
}
private void invokeWrite(ChannelHandlerContext ctx, Object msg, boolean flush, ChannelPromise promise) {
if (executor.inEventLoop()) {
invokeWriteNow(ctx, msg, promise);
if (flush) {
invokeFlushNow(ctx);
}
} else {
AbstractChannel channel = (AbstractChannel) ctx.channel();
int size = channel.estimatorHandle().size(msg);
if (size > 0) {
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.incrementPendingOutboundBytes(size);
}
}
Runnable task;
if (flush) {
task = WriteAndFlushTask.newInstance(ctx, msg, size, promise);
} else {
task = WriteTask.newInstance(ctx, msg, size, promise);
}
safeExecuteOutbound(task, promise, msg);
}
}
@Override
public void invokeFlush(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeFlushNow(ctx);
} else {
DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx;
Runnable task = dctx.invokeFlushTask;
if (task == null) {
dctx.invokeFlushTask = task = new Runnable() {
@Override
public void run() {
invokeFlushNow(ctx);
}
};
}
executor.execute(task);
}
}
@Override
public void invokeWriteAndFlush(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
validatePromise(ctx, promise, true);
invokeWrite(ctx, msg, true, promise);
}
private static void validatePromise(ChannelHandlerContext ctx, ChannelPromise promise, boolean allowVoidPromise) {
if (ctx == null) {
throw new NullPointerException("ctx");
}
if (promise == null) {
throw new NullPointerException("promise");
}
if (promise.isDone()) {
throw new IllegalArgumentException("promise already done: " + promise);
}
if (promise.channel() != ctx.channel()) {
throw new IllegalArgumentException(String.format(
"promise.channel does not match: %s (expected: %s)", promise.channel(), ctx.channel()));
}
if (promise.getClass() == DefaultChannelPromise.class) {
return;
}
if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
throw new IllegalArgumentException(
StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation");
}
if (promise instanceof AbstractChannel.CloseFuture) {
throw new IllegalArgumentException(
StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
}
}
private void safeExecuteInbound(Runnable task, Object msg) {
boolean success = false;
try {
executor.execute(task);
success = true;
} finally {
if (!success) {
ReferenceCountUtil.release(msg);
}
}
}
private void safeExecuteOutbound(Runnable task, ChannelPromise promise) {
try {
executor.execute(task);
} catch (Throwable cause) {
promise.setFailure(cause);
}
}
private void safeExecuteOutbound(Runnable task, ChannelPromise promise, Object msg) {
try {
executor.execute(task);
} catch (Throwable cause) {
try {
promise.setFailure(cause);
} finally {
ReferenceCountUtil.release(msg);
}
}
}
abstract static class AbstractWriteTask<T> extends OneTimeTask {
private final Recycler.Handle<T> handle;
private ChannelHandlerContext ctx;
private Object msg;
private ChannelPromise promise;
private int size;
protected AbstractWriteTask(Recycler.Handle<T> handle) {
this.handle = handle;
}
protected static void init(
AbstractWriteTask<?> task, ChannelHandlerContext ctx, Object msg, int size, ChannelPromise promise) {
task.ctx = ctx;
task.msg = msg;
task.promise = promise;
task.size = size;
}
@Override
public final void run() {
try {
if (size > 0) {
ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.decrementPendingOutboundBytes(size);
}
}
write(ctx, msg, promise);
} finally {
// Set to null so the GC can collect them directly
ctx = null;
msg = null;
promise = null;
recycle(handle);
}
}
protected void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
invokeWriteNow(ctx, msg, promise);
}
protected abstract void recycle(Recycler.Handle<T> handle);
}
static final class WriteTask
extends AbstractWriteTask<WriteTask> implements SingleThreadEventLoop.NonWakeupRunnable {
private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() {
@Override
protected WriteTask newObject(Handle<WriteTask> handle) {
return new WriteTask(handle);
}
};
static WriteTask newInstance(ChannelHandlerContext ctx, Object msg, int size, ChannelPromise promise) {
WriteTask task = RECYCLER.get();
init(task, ctx, msg, size, promise);
return task;
}
private WriteTask(Recycler.Handle<WriteTask> handle) {
super(handle);
}
@Override
protected void recycle(Recycler.Handle<WriteTask> handle) {
RECYCLER.recycle(this, handle);
}
}
static final class WriteAndFlushTask extends AbstractWriteTask<WriteAndFlushTask> {
private static final Recycler<WriteAndFlushTask> RECYCLER = new Recycler<WriteAndFlushTask>() {
@Override
protected WriteAndFlushTask newObject(Handle<WriteAndFlushTask> handle) {
return new WriteAndFlushTask(handle);
}
};
static WriteAndFlushTask newInstance(
ChannelHandlerContext ctx, Object msg, int size, ChannelPromise promise) {
WriteAndFlushTask task = RECYCLER.get();
init(task, ctx, msg, size, promise);
return task;
}
private WriteAndFlushTask(Recycler.Handle<WriteAndFlushTask> handle) {
super(handle);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
super.write(ctx, msg, promise);
invokeFlushNow(ctx);
}
@Override
protected void recycle(Recycler.Handle<WriteAndFlushTask> handle) {
RECYCLER.recycle(this, handle);
}
}
}

View File

@ -36,6 +36,7 @@ import java.util.NoSuchElementException;
import java.util.WeakHashMap; import java.util.WeakHashMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/** /**
* The default {@link ChannelPipeline} implementation. It is usually created * The default {@link ChannelPipeline} implementation. It is usually created
@ -49,10 +50,21 @@ final class DefaultChannelPipeline implements ChannelPipeline {
private static final WeakHashMap<Class<?>, String>[] nameCaches = private static final WeakHashMap<Class<?>, String>[] nameCaches =
new WeakHashMap[Runtime.getRuntime().availableProcessors()]; new WeakHashMap[Runtime.getRuntime().availableProcessors()];
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, Map> childInvokersUpdater;
static { static {
for (int i = 0; i < nameCaches.length; i ++) { for (int i = 0; i < nameCaches.length; i ++) {
nameCaches[i] = new WeakHashMap<Class<?>, String>(); nameCaches[i] = new WeakHashMap<Class<?>, String>();
} }
@SuppressWarnings("rawtypes")
AtomicReferenceFieldUpdater<DefaultChannelPipeline, Map> updater;
updater = PlatformDependent.newAtomicReferenceFieldUpdater(DefaultChannelPipeline.class, "childInvokers");
if (updater == null) {
updater = AtomicReferenceFieldUpdater.newUpdater(DefaultChannelPipeline.class, Map.class, "childInvokers");
}
childInvokersUpdater = updater;
} }
final AbstractChannel channel; final AbstractChannel channel;
@ -63,8 +75,13 @@ final class DefaultChannelPipeline implements ChannelPipeline {
private final Map<String, DefaultChannelHandlerContext> name2ctx = private final Map<String, DefaultChannelHandlerContext> name2ctx =
new HashMap<String, DefaultChannelHandlerContext>(4); new HashMap<String, DefaultChannelHandlerContext>(4);
final Map<EventExecutorGroup, EventExecutor> childExecutors = /**
new IdentityHashMap<EventExecutorGroup, EventExecutor>(); * Updated by {@link #childInvokersUpdater}.
*
* @see #findInvoker(EventExecutorGroup)
*/
@SuppressWarnings("UnusedDeclaration")
private volatile Map<EventExecutorGroup, ChannelHandlerInvoker> childInvokers;
DefaultChannelPipeline(AbstractChannel channel) { DefaultChannelPipeline(AbstractChannel channel) {
if (channel == null) { if (channel == null) {
@ -89,17 +106,20 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public ChannelPipeline addFirst(String name, ChannelHandler handler) { public ChannelPipeline addFirst(String name, ChannelHandler handler) {
return addFirst(null, name, handler); return addFirst((ChannelHandlerInvoker) null, name, handler);
} }
@Override @Override
public ChannelPipeline addFirst(EventExecutorGroup group, final String name, ChannelHandler handler) { public ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
return addFirst(findInvoker(group), name, handler);
}
@Override
public ChannelPipeline addFirst(ChannelHandlerInvoker invoker, final String name, ChannelHandler handler) {
synchronized (this) { synchronized (this) {
checkDuplicateName(name); checkDuplicateName(name);
DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler); addFirst0(name, new DefaultChannelHandlerContext(this, invoker, name, handler));
addFirst0(name, newCtx);
} }
return this; return this;
} }
@ -119,18 +139,20 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public ChannelPipeline addLast(String name, ChannelHandler handler) { public ChannelPipeline addLast(String name, ChannelHandler handler) {
return addLast(null, name, handler); return addLast((ChannelHandlerInvoker) null, name, handler);
} }
@Override @Override
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) { public ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
return addLast(findInvoker(group), name, handler);
}
@Override
public ChannelPipeline addLast(ChannelHandlerInvoker invoker, final String name, ChannelHandler handler) {
synchronized (this) { synchronized (this) {
checkDuplicateName(name); checkDuplicateName(name);
addLast0(name, new DefaultChannelHandlerContext(this, invoker, name, handler));
DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
addLast0(name, newCtx);
} }
return this; return this;
} }
@ -150,17 +172,21 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) { public ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
return addBefore(null, baseName, name, handler); return addBefore((ChannelHandlerInvoker) null, baseName, name, handler);
}
@Override
public ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
return addBefore(findInvoker(group), baseName, name, handler);
} }
@Override @Override
public ChannelPipeline addBefore( public ChannelPipeline addBefore(
EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) { ChannelHandlerInvoker invoker, String baseName, final String name, ChannelHandler handler) {
synchronized (this) { synchronized (this) {
DefaultChannelHandlerContext ctx = getContextOrDie(baseName); DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
checkDuplicateName(name); checkDuplicateName(name);
DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler); addBefore0(name, ctx, new DefaultChannelHandlerContext(this, invoker, name, handler));
addBefore0(name, ctx, newCtx);
} }
return this; return this;
} }
@ -180,20 +206,22 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) { public ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
return addAfter(null, baseName, name, handler); return addAfter((ChannelHandlerInvoker) null, baseName, name, handler);
}
@Override
public ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
return addAfter(findInvoker(group), baseName, name, handler);
} }
@Override @Override
public ChannelPipeline addAfter( public ChannelPipeline addAfter(
EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) { ChannelHandlerInvoker invoker, String baseName, final String name, ChannelHandler handler) {
synchronized (this) { synchronized (this) {
DefaultChannelHandlerContext ctx = getContextOrDie(baseName); DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
checkDuplicateName(name); checkDuplicateName(name);
DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler); addAfter0(name, ctx, new DefaultChannelHandlerContext(this, invoker, name, handler));
addAfter0(name, ctx, newCtx);
} }
return this; return this;
} }
@ -213,11 +241,16 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public ChannelPipeline addFirst(ChannelHandler... handlers) { public ChannelPipeline addFirst(ChannelHandler... handlers) {
return addFirst(null, handlers); return addFirst((ChannelHandlerInvoker) null, handlers);
} }
@Override @Override
public ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) { public ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers) {
return addFirst(findInvoker(group), handlers);
}
@Override
public ChannelPipeline addFirst(ChannelHandlerInvoker invoker, ChannelHandler... handlers) {
if (handlers == null) { if (handlers == null) {
throw new NullPointerException("handlers"); throw new NullPointerException("handlers");
} }
@ -234,7 +267,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
for (int i = size - 1; i >= 0; i --) { for (int i = size - 1; i >= 0; i --) {
ChannelHandler h = handlers[i]; ChannelHandler h = handlers[i];
addFirst(executor, generateName(h), h); addFirst(invoker, generateName(h), h);
} }
return this; return this;
@ -242,11 +275,16 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public ChannelPipeline addLast(ChannelHandler... handlers) { public ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers); return addLast((ChannelHandlerInvoker) null, handlers);
} }
@Override @Override
public ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { public ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers) {
return addLast(findInvoker(group), handlers);
}
@Override
public ChannelPipeline addLast(ChannelHandlerInvoker invoker, ChannelHandler... handlers) {
if (handlers == null) { if (handlers == null) {
throw new NullPointerException("handlers"); throw new NullPointerException("handlers");
} }
@ -255,12 +293,45 @@ final class DefaultChannelPipeline implements ChannelPipeline {
if (h == null) { if (h == null) {
break; break;
} }
addLast(executor, generateName(h), h); addLast(invoker, generateName(h), h);
} }
return this; return this;
} }
private ChannelHandlerInvoker findInvoker(EventExecutorGroup group) {
if (group == null) {
return null;
}
// Lazily initialize the data structure that maps an EventExecutorGroup to a ChannelHandlerInvoker.
Map<EventExecutorGroup, ChannelHandlerInvoker> childInvokers = this.childInvokers;
if (childInvokers == null) {
childInvokers = new IdentityHashMap<EventExecutorGroup, ChannelHandlerInvoker>();
if (!childInvokersUpdater.compareAndSet(this, null, childInvokers)) {
childInvokers = this.childInvokers;
}
}
// Pick one of the child executors and remember its invoker
// so that the same invoker is used to fire events for the same channel.
ChannelHandlerInvoker invoker;
synchronized (childInvokers) {
invoker = childInvokers.get(group);
if (invoker == null) {
EventExecutor executor = group.next();
if (executor instanceof EventLoop) {
invoker = ((EventLoop) executor).asInvoker();
} else {
invoker = new DefaultChannelHandlerInvoker(executor);
}
childInvokers.put(group, invoker);
}
}
return invoker;
}
String generateName(ChannelHandler handler) { String generateName(ChannelHandler handler) {
WeakHashMap<Class<?>, String> cache = nameCaches[(int) (Thread.currentThread().getId() % nameCaches.length)]; WeakHashMap<Class<?>, String> cache = nameCaches[(int) (Thread.currentThread().getId() % nameCaches.length)];
Class<?> handlerType = handler.getClass(); Class<?> handlerType = handler.getClass();
@ -396,7 +467,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
final DefaultChannelHandlerContext newCtx = final DefaultChannelHandlerContext newCtx =
new DefaultChannelHandlerContext(this, ctx.executor, newName, newHandler); new DefaultChannelHandlerContext(this, ctx.invoker, newName, newHandler);
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
replace0(ctx, newName, newCtx); replace0(ctx, newName, newCtx);

View File

@ -27,4 +27,10 @@ import io.netty.util.concurrent.EventExecutor;
public interface EventLoop extends EventExecutor, EventLoopGroup { public interface EventLoop extends EventExecutor, EventLoopGroup {
@Override @Override
EventLoopGroup parent(); EventLoopGroup parent();
/**
* Creates a new default {@link ChannelHandlerInvoker} implementation that uses this {@link EventLoop} to
* invoke event handler methods.
*/
ChannelHandlerInvoker asInvoker();
} }

View File

@ -26,6 +26,8 @@ import java.util.concurrent.ThreadFactory;
*/ */
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
private final ChannelHandlerInvoker invoker = new DefaultChannelHandlerInvoker(this);
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
super(parent, threadFactory, addTaskWakesUp); super(parent, threadFactory, addTaskWakesUp);
} }
@ -44,6 +46,11 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im
return (EventLoop) super.next(); return (EventLoop) super.next();
} }
@Override
public ChannelHandlerInvoker asInvoker() {
return invoker;
}
@Override @Override
public ChannelFuture register(Channel channel) { public ChannelFuture register(Channel channel) {
return register(channel, new DefaultChannelPromise(channel, this)); return register(channel, new DefaultChannelPromise(channel, this));

View File

@ -18,16 +18,21 @@ package io.netty.channel.embedded;
import io.netty.channel.AbstractEventLoop; import io.netty.channel.AbstractEventLoop;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerInvoker;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise; import io.netty.channel.DefaultChannelPromise;
import io.netty.channel.EventLoop; import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import java.net.SocketAddress;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
final class EmbeddedEventLoop extends AbstractEventLoop { import static io.netty.channel.ChannelHandlerInvokerUtil.*;
final class EmbeddedEventLoop extends AbstractEventLoop implements ChannelHandlerInvoker {
private final Queue<Runnable> tasks = new ArrayDeque<Runnable>(2); private final Queue<Runnable> tasks = new ArrayDeque<Runnable>(2);
@ -82,9 +87,7 @@ final class EmbeddedEventLoop extends AbstractEventLoop {
} }
@Override @Override
public boolean awaitTermination(long timeout, TimeUnit unit) public boolean awaitTermination(long timeout, TimeUnit unit) {
throws InterruptedException {
Thread.sleep(unit.toMillis(timeout));
return false; return false;
} }
@ -110,7 +113,103 @@ final class EmbeddedEventLoop extends AbstractEventLoop {
} }
@Override @Override
public EventLoop next() { public ChannelHandlerInvoker asInvoker() {
return this; return this;
} }
@Override
public EventExecutor executor() {
return this;
}
@Override
public void invokeChannelRegistered(ChannelHandlerContext ctx) {
invokeChannelRegisteredNow(ctx);
}
@Override
public void invokeChannelActive(ChannelHandlerContext ctx) {
invokeChannelActiveNow(ctx);
}
@Override
public void invokeChannelInactive(ChannelHandlerContext ctx) {
invokeChannelInactiveNow(ctx);
}
@Override
public void invokeChannelUnregistered(ChannelHandlerContext ctx) {
invokeChannelUnregisteredNow(ctx);
}
@Override
public void invokeExceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
invokeExceptionCaughtNow(ctx, cause);
}
@Override
public void invokeUserEventTriggered(ChannelHandlerContext ctx, Object event) {
invokeUserEventTriggeredNow(ctx, event);
}
@Override
public void invokeChannelRead(ChannelHandlerContext ctx, Object msg) {
invokeChannelReadNow(ctx, msg);
}
@Override
public void invokeChannelReadComplete(ChannelHandlerContext ctx) {
invokeChannelReadCompleteNow(ctx);
}
@Override
public void invokeChannelWritabilityChanged(ChannelHandlerContext ctx) {
invokeChannelWritabilityChangedNow(ctx);
}
@Override
public void invokeBind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
invokeBindNow(ctx, localAddress, promise);
}
@Override
public void invokeConnect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
invokeConnectNow(ctx, remoteAddress, localAddress, promise);
}
@Override
public void invokeDisconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
invokeDisconnectNow(ctx, promise);
}
@Override
public void invokeClose(ChannelHandlerContext ctx, ChannelPromise promise) {
invokeCloseNow(ctx, promise);
}
@Override
public void invokeDeregister(ChannelHandlerContext ctx, ChannelPromise promise) {
invokeDeregisterNow(ctx, promise);
}
@Override
public void invokeRead(ChannelHandlerContext ctx) {
invokeReadNow(ctx);
}
@Override
public void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
invokeWriteNow(ctx, msg, promise);
}
@Override
public void invokeWriteAndFlush(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
invokeWriteAndFlushNow(ctx, msg, promise);
}
@Override
public void invokeFlush(ChannelHandlerContext ctx) {
invokeFlushNow(ctx);
}
} }

View File

@ -16,6 +16,7 @@
package io.netty.channel.nio; package io.netty.channel.nio;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutor;
@ -92,8 +93,7 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup {
} }
@Override @Override
protected EventExecutor newChild( protected EventLoop newChild(Executor executor, Object... args) throws Exception {
Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0]); return new NioEventLoop(this, executor, (SelectorProvider) args[0]);
} }
} }