diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java index f4a7867375..101312c611 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java @@ -15,6 +15,7 @@ */ package io.netty.channel.epoll; +import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.MultithreadEventLoopGroup; import io.netty.util.concurrent.EventExecutor; @@ -68,7 +69,7 @@ public final class EpollEventLoopGroup extends MultithreadEventLoopGroup { } @Override - protected EventExecutor newChild(Executor executor, Object... args) throws Exception { + protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new EpollEventLoop(this, executor, (Integer) args[0]); } } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index c61fef6848..c94bee5ac0 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -387,6 +387,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private boolean inFlush0; + @Override + public final ChannelHandlerInvoker invoker() { + return eventLoop().asInvoker(); + } + @Override public final ChannelOutboundBuffer outboundBuffer() { return outboundBuffer; @@ -673,6 +678,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha flush0(); } + @SuppressWarnings("deprecation") protected void flush0() { if (inFlush0) { // Avoid re-entrance diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index 3c3f62359f..f34beb5ec9 100644 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -431,6 +431,7 @@ public interface Channel extends AttributeMap, Comparable { * are only provided to implement the actual transport, and must be invoked from an I/O thread except for the * following methods: * */ interface Unsafe { + + /** + * Returns the {@link ChannelHandlerInvoker} which is used by default unless specified by a user. + */ + ChannelHandlerInvoker invoker(); + /** * Return the {@link SocketAddress} to which is bound local or * {@code null} if none. diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerAppender.java b/transport/src/main/java/io/netty/channel/ChannelHandlerAppender.java index 2ddf769a6b..57531f37f9 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerAppender.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerAppender.java @@ -192,7 +192,7 @@ public class ChannelHandlerAppender extends ChannelInboundHandlerAdapter { } else { name = e.name; } - pipeline.addAfter(dctx.executor, oldName, name, e.handler); + pipeline.addAfter(dctx.invoker, oldName, name, e.handler); } } finally { if (selfRemoval) { diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerInvoker.java b/transport/src/main/java/io/netty/channel/ChannelHandlerInvoker.java new file mode 100644 index 0000000000..463c71987c --- /dev/null +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerInvoker.java @@ -0,0 +1,163 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel; + +import io.netty.util.concurrent.EventExecutor; + +import java.net.SocketAddress; + +/** + * Invokes the event handler methods of {@link ChannelInboundHandler} and {@link ChannelOutboundHandler}. + * A user can specify a {@link ChannelHandlerInvoker} to implement a custom thread model unsupported by the default + * implementation. + */ +public interface ChannelHandlerInvoker { + + /** + * Returns the {@link EventExecutor} which is used to execute an arbitrary task. + */ + EventExecutor executor(); + + /** + * Invokes {@link ChannelInboundHandler#channelRegistered(ChannelHandlerContext)}. This method is not for a user + * but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in + * {@link ChannelHandlerContext} instead. + */ + void invokeChannelRegistered(ChannelHandlerContext ctx); + + /** + * Invokes {@link ChannelInboundHandler#channelUnregistered(ChannelHandlerContext)}. This method is not for a user + * but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in + * {@link ChannelHandlerContext} instead. + */ + void invokeChannelUnregistered(ChannelHandlerContext ctx); + + /** + * Invokes {@link ChannelInboundHandler#channelActive(ChannelHandlerContext)}. This method is not for a user + * but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in + * {@link ChannelHandlerContext} instead. + */ + void invokeChannelActive(ChannelHandlerContext ctx); + + /** + * Invokes {@link ChannelInboundHandler#channelInactive(ChannelHandlerContext)}. This method is not for a user + * but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in + * {@link ChannelHandlerContext} instead. + */ + void invokeChannelInactive(ChannelHandlerContext ctx); + + /** + * Invokes {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}. This method is not for a user + * but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in + * {@link ChannelHandlerContext} instead. + */ + void invokeExceptionCaught(ChannelHandlerContext ctx, Throwable cause); + + /** + * Invokes {@link ChannelInboundHandler#userEventTriggered(ChannelHandlerContext, Object)}. This method is not for + * a user but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in + * {@link ChannelHandlerContext} instead. + */ + void invokeUserEventTriggered(ChannelHandlerContext ctx, Object event); + + /** + * Invokes {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is not for a user + * but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in + * {@link ChannelHandlerContext} instead. + */ + void invokeChannelRead(ChannelHandlerContext ctx, Object msg); + + /** + * Invokes {@link ChannelInboundHandler#channelReadComplete(ChannelHandlerContext)}. This method is not for a user + * but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in + * {@link ChannelHandlerContext} instead. + */ + void invokeChannelReadComplete(ChannelHandlerContext ctx); + + /** + * Invokes {@link ChannelInboundHandler#channelWritabilityChanged(ChannelHandlerContext)}. This method is not for + * a user but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in + * {@link ChannelHandlerContext} instead. + */ + void invokeChannelWritabilityChanged(ChannelHandlerContext ctx); + + /** + * Invokes {@link ChannelOutboundHandler#bind(ChannelHandlerContext, SocketAddress, ChannelPromise)}. + * This method is not for a user but for the internal {@link ChannelHandlerContext} implementation. + * To trigger an event, use the methods in {@link ChannelHandlerContext} instead. + */ + void invokeBind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise); + + /** + * Invokes + * {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)}. + * This method is not for a user but for the internal {@link ChannelHandlerContext} implementation. + * To trigger an event, use the methods in {@link ChannelHandlerContext} instead. + */ + void invokeConnect( + ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise); + + /** + * Invokes {@link ChannelOutboundHandler#disconnect(ChannelHandlerContext, ChannelPromise)}. + * This method is not for a user but for the internal {@link ChannelHandlerContext} implementation. + * To trigger an event, use the methods in {@link ChannelHandlerContext} instead. + */ + void invokeDisconnect(ChannelHandlerContext ctx, ChannelPromise promise); + + /** + * Invokes {@link ChannelOutboundHandler#close(ChannelHandlerContext, ChannelPromise)}. + * This method is not for a user but for the internal {@link ChannelHandlerContext} implementation. + * To trigger an event, use the methods in {@link ChannelHandlerContext} instead. + */ + void invokeClose(ChannelHandlerContext ctx, ChannelPromise promise); + + /** + * Invokes {@link ChannelOutboundHandler#deregister(ChannelHandlerContext, ChannelPromise)}. + * This method is not for a user but for the internal {@link ChannelHandlerContext} implementation. + * To trigger an event, use the methods in {@link ChannelHandlerContext} instead. + */ + void invokeDeregister(ChannelHandlerContext ctx, ChannelPromise promise); + + /** + * Invokes {@link ChannelOutboundHandler#read(ChannelHandlerContext)}. + * This method is not for a user but for the internal {@link ChannelHandlerContext} implementation. + * To trigger an event, use the methods in {@link ChannelHandlerContext} instead. + */ + void invokeRead(ChannelHandlerContext ctx); + + /** + * Invokes {@link ChannelOutboundHandler#write(ChannelHandlerContext, Object, ChannelPromise)}. + * This method is not for a user but for the internal {@link ChannelHandlerContext} implementation. + * To trigger an event, use the methods in {@link ChannelHandlerContext} instead. + */ + void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise); + + /** + * Invokes {@link ChannelOutboundHandler#write(ChannelHandlerContext, Object, ChannelPromise)} and + * {@link ChannelOutboundHandler#flush(ChannelHandlerContext)} sequentially. + * This method is not for a user but for the internal {@link ChannelHandlerContext} implementation. + * To trigger an event, use the methods in {@link ChannelHandlerContext} instead. + */ + void invokeWriteAndFlush(ChannelHandlerContext ctx, Object msg, ChannelPromise promise); + + /** + * Invokes {@link ChannelOutboundHandler#flush(ChannelHandlerContext)}. + * This method is not for a user but for the internal {@link ChannelHandlerContext} implementation. + * To trigger an event, use the methods in {@link ChannelHandlerContext} instead. + */ + void invokeFlush(ChannelHandlerContext ctx); +} diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerInvokerUtil.java b/transport/src/main/java/io/netty/channel/ChannelHandlerInvokerUtil.java new file mode 100644 index 0000000000..2b9df27eef --- /dev/null +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerInvokerUtil.java @@ -0,0 +1,225 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel; + +import java.net.SocketAddress; + +import static io.netty.channel.DefaultChannelPipeline.*; + +/** + * A set of helper methods for easier implementation of custom {@link ChannelHandlerInvoker} implementation. + */ +public final class ChannelHandlerInvokerUtil { + + public static void invokeChannelRegisteredNow(ChannelHandlerContext ctx) { + try { + ((ChannelInboundHandler) ctx.handler()).channelRegistered(ctx); + } catch (Throwable t) { + notifyHandlerException(ctx, t); + } + } + + @SuppressWarnings("deprecation") + public static void invokeChannelUnregisteredNow(ChannelHandlerContext ctx) { + try { + ((ChannelInboundHandler) ctx.handler()).channelUnregistered(ctx); + } catch (Throwable t) { + notifyHandlerException(ctx, t); + } + } + + public static void invokeChannelActiveNow(final ChannelHandlerContext ctx) { + try { + ((ChannelInboundHandler) ctx.handler()).channelActive(ctx); + } catch (Throwable t) { + notifyHandlerException(ctx, t); + } + } + + public static void invokeChannelInactiveNow(final ChannelHandlerContext ctx) { + try { + ((ChannelInboundHandler) ctx.handler()).channelInactive(ctx); + } catch (Throwable t) { + notifyHandlerException(ctx, t); + } + } + + @SuppressWarnings("deprecation") + public static void invokeExceptionCaughtNow(final ChannelHandlerContext ctx, final Throwable cause) { + try { + ctx.handler().exceptionCaught(ctx, cause); + } catch (Throwable t) { + if (logger.isWarnEnabled()) { + logger.warn("An exception was thrown by a user handler's exceptionCaught() method:", t); + logger.warn(".. and the cause of the exceptionCaught() was:", cause); + } + } + } + + public static void invokeUserEventTriggeredNow(final ChannelHandlerContext ctx, final Object event) { + try { + ((ChannelInboundHandler) ctx.handler()).userEventTriggered(ctx, event); + } catch (Throwable t) { + notifyHandlerException(ctx, t); + } + } + + public static void invokeChannelReadNow(final ChannelHandlerContext ctx, final Object msg) { + try { + ((ChannelInboundHandler) ctx.handler()).channelRead(ctx, msg); + } catch (Throwable t) { + notifyHandlerException(ctx, t); + } + } + + public static void invokeChannelReadCompleteNow(final ChannelHandlerContext ctx) { + try { + ((ChannelInboundHandler) ctx.handler()).channelReadComplete(ctx); + } catch (Throwable t) { + notifyHandlerException(ctx, t); + } + } + + public static void invokeChannelWritabilityChangedNow(final ChannelHandlerContext ctx) { + try { + ((ChannelInboundHandler) ctx.handler()).channelWritabilityChanged(ctx); + } catch (Throwable t) { + notifyHandlerException(ctx, t); + } + } + + public static void invokeBindNow( + final ChannelHandlerContext ctx, final SocketAddress localAddress, final ChannelPromise promise) { + try { + ((ChannelOutboundHandler) ctx.handler()).bind(ctx, localAddress, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } + public static void invokeConnectNow( + final ChannelHandlerContext ctx, + final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { + try { + ((ChannelOutboundHandler) ctx.handler()).connect(ctx, remoteAddress, localAddress, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } + + public static void invokeDisconnectNow(final ChannelHandlerContext ctx, final ChannelPromise promise) { + try { + ((ChannelOutboundHandler) ctx.handler()).disconnect(ctx, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } + + public static void invokeCloseNow(final ChannelHandlerContext ctx, final ChannelPromise promise) { + try { + ((ChannelOutboundHandler) ctx.handler()).close(ctx, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } + + @SuppressWarnings("deprecation") + public static void invokeDeregisterNow(final ChannelHandlerContext ctx, final ChannelPromise promise) { + try { + ((ChannelOutboundHandler) ctx.handler()).deregister(ctx, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } + + public static void invokeReadNow(final ChannelHandlerContext ctx) { + try { + ((ChannelOutboundHandler) ctx.handler()).read(ctx); + } catch (Throwable t) { + notifyHandlerException(ctx, t); + } + } + + public static void invokeWriteNow(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + try { + ((ChannelOutboundHandler) ctx.handler()).write(ctx, msg, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } + + public static void invokeFlushNow(final ChannelHandlerContext ctx) { + try { + ((ChannelOutboundHandler) ctx.handler()).flush(ctx); + } catch (Throwable t) { + notifyHandlerException(ctx, t); + } + } + + public static void invokeWriteAndFlushNow(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + invokeWriteNow(ctx, msg, promise); + invokeFlushNow(ctx); + } + + private static void notifyHandlerException(ChannelHandlerContext ctx, Throwable cause) { + if (inExceptionCaught(cause)) { + if (logger.isWarnEnabled()) { + logger.warn( + "An exception was thrown by a user handler " + + "while handling an exceptionCaught event", cause); + } + return; + } + + invokeExceptionCaughtNow(ctx, cause); + } + + private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) { + // only try to fail the promise if its not a VoidChannelPromise, as + // the VoidChannelPromise would also fire the cause through the pipeline + if (promise instanceof VoidChannelPromise) { + return; + } + + if (!promise.tryFailure(cause)) { + if (logger.isWarnEnabled()) { + logger.warn("Failed to fail the promise because it's done already: {}", promise, cause); + } + } + } + + private static boolean inExceptionCaught(Throwable cause) { + do { + StackTraceElement[] trace = cause.getStackTrace(); + if (trace != null) { + for (StackTraceElement t : trace) { + if (t == null) { + break; + } + if ("exceptionCaught".equals(t.getMethodName())) { + return true; + } + } + } + + cause = cause.getCause(); + } while (cause != null); + + return false; + } + + private ChannelHandlerInvokerUtil() { } +} diff --git a/transport/src/main/java/io/netty/channel/ChannelPipeline.java b/transport/src/main/java/io/netty/channel/ChannelPipeline.java index 2ac3e52c48..2353350620 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/ChannelPipeline.java @@ -215,8 +215,7 @@ import java.util.NoSuchElementException; * For example, you can insert an encryption handler when sensitive information is about to be exchanged, and remove it * after the exchange. */ -public interface ChannelPipeline - extends Iterable> { +public interface ChannelPipeline extends Iterable> { /** * Inserts a {@link ChannelHandler} at the first position of this pipeline. @@ -246,6 +245,20 @@ public interface ChannelPipeline */ ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler); + /** + * Inserts a {@link ChannelHandler} at the first position of this pipeline. + * + * @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods + * @param name the name of the handler to insert first + * @param handler the handler to insert first + * + * @throws IllegalArgumentException + * if there's an entry with the same name already in the pipeline + * @throws NullPointerException + * if the specified name or handler is {@code null} + */ + ChannelPipeline addFirst(ChannelHandlerInvoker invoker, String name, ChannelHandler handler); + /** * Appends a {@link ChannelHandler} at the last position of this pipeline. * @@ -274,6 +287,20 @@ public interface ChannelPipeline */ ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler); + /** + * Appends a {@link ChannelHandler} at the last position of this pipeline. + * + * @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods + * @param name the name of the handler to append + * @param handler the handler to append + * + * @throws IllegalArgumentException + * if there's an entry with the same name already in the pipeline + * @throws NullPointerException + * if the specified name or handler is {@code null} + */ + ChannelPipeline addLast(ChannelHandlerInvoker invoker, String name, ChannelHandler handler); + /** * Inserts a {@link ChannelHandler} before an existing handler of this * pipeline. @@ -310,6 +337,24 @@ public interface ChannelPipeline */ ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler); + /** + * Inserts a {@link ChannelHandler} before an existing handler of this + * pipeline. + * + * @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods + * @param baseName the name of the existing handler + * @param name the name of the handler to insert before + * @param handler the handler to insert before + * + * @throws NoSuchElementException + * if there's no such entry with the specified {@code baseName} + * @throws IllegalArgumentException + * if there's an entry with the same name already in the pipeline + * @throws NullPointerException + * if the specified baseName, name, or handler is {@code null} + */ + ChannelPipeline addBefore(ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler); + /** * Inserts a {@link ChannelHandler} after an existing handler of this * pipeline. @@ -346,6 +391,24 @@ public interface ChannelPipeline */ ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler); + /** + * Inserts a {@link ChannelHandler} after an existing handler of this + * pipeline. + * + * @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods + * @param baseName the name of the existing handler + * @param name the name of the handler to insert after + * @param handler the handler to insert after + * + * @throws NoSuchElementException + * if there's no such entry with the specified {@code baseName} + * @throws IllegalArgumentException + * if there's an entry with the same name already in the pipeline + * @throws NullPointerException + * if the specified baseName, name, or handler is {@code null} + */ + ChannelPipeline addAfter(ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler); + /** * Inserts a {@link ChannelHandler}s at the first position of this pipeline. * @@ -364,6 +427,15 @@ public interface ChannelPipeline */ ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers); + /** + * Inserts a {@link ChannelHandler}s at the first position of this pipeline. + * + * @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods + * @param handlers the handlers to insert first + * + */ + ChannelPipeline addFirst(ChannelHandlerInvoker invoker, ChannelHandler... handlers); + /** * Inserts a {@link ChannelHandler}s at the last position of this pipeline. * @@ -382,6 +454,15 @@ public interface ChannelPipeline */ ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers); + /** + * Inserts a {@link ChannelHandler}s at the last position of this pipeline. + * + * @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods + * @param handlers the handlers to insert last + * + */ + ChannelPipeline addLast(ChannelHandlerInvoker invoker, ChannelHandler... handlers); + /** * Removes the specified {@link ChannelHandler} from this pipeline. * diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index 53afe53614..fff03fb744 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -18,18 +18,13 @@ package io.netty.channel; import io.netty.buffer.ByteBufAllocator; import io.netty.util.Attribute; import io.netty.util.AttributeKey; -import io.netty.util.Recycler; import io.netty.util.ReferenceCountUtil; import io.netty.util.ResourceLeakHint; import io.netty.util.concurrent.EventExecutor; -import io.netty.util.concurrent.EventExecutorGroup; -import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.StringUtil; import java.net.SocketAddress; -import static io.netty.channel.DefaultChannelPipeline.*; - final class DefaultChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint { volatile DefaultChannelHandlerContext next; @@ -43,21 +38,19 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou private final ChannelHandler handler; private boolean removed; - // Will be set to null if no child executor should be used, otherwise it will be set to the - // child executor. - final EventExecutor executor; + final ChannelHandlerInvoker invoker; private ChannelFuture succeededFuture; // Lazily instantiated tasks used to trigger events to a handler with different executor. // These needs to be volatile as otherwise an other Thread may see an half initialized instance. // See the JMM for more details - private volatile Runnable invokeChannelReadCompleteTask; - private volatile Runnable invokeReadTask; - private volatile Runnable invokeChannelWritableStateChangedTask; - private volatile Runnable invokeFlushTask; + volatile Runnable invokeChannelReadCompleteTask; + volatile Runnable invokeReadTask; + volatile Runnable invokeChannelWritableStateChangedTask; + volatile Runnable invokeFlushTask; - DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutorGroup group, String name, - ChannelHandler handler) { + DefaultChannelHandlerContext( + DefaultChannelPipeline pipeline, ChannelHandlerInvoker invoker, String name, ChannelHandler handler) { if (name == null) { throw new NullPointerException("name"); @@ -70,19 +63,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou this.pipeline = pipeline; this.name = name; this.handler = handler; - - if (group != null) { - // Pin one of the child executors once and remember it so that the same child executor - // is used to fire events for the same channel. - EventExecutor childExecutor = pipeline.childExecutors.get(group); - if (childExecutor == null) { - childExecutor = group.next(); - pipeline.childExecutors.put(group, childExecutor); - } - executor = childExecutor; - } else { - executor = null; - } + this.invoker = invoker; inbound = handler instanceof ChannelInboundHandler; outbound = handler instanceof ChannelOutboundHandler; @@ -130,10 +111,14 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou @Override public EventExecutor executor() { - if (executor == null) { - return channel().eventLoop(); + return invoker().executor(); + } + + public ChannelHandlerInvoker invoker() { + if (invoker == null) { + return channel.unsafe().invoker(); } else { - return executor; + return invoker; } } @@ -154,265 +139,68 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou @Override public ChannelHandlerContext fireChannelRegistered() { - final DefaultChannelHandlerContext next = findContextInbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeChannelRegistered(); - } else { - executor.execute(new OneTimeTask() { - @Override - public void run() { - next.invokeChannelRegistered(); - } - }); - } + DefaultChannelHandlerContext next = findContextInbound(); + next.invoker().invokeChannelRegistered(next); return this; } - private void invokeChannelRegistered() { - try { - ((ChannelInboundHandler) handler).channelRegistered(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } - @Override public ChannelHandlerContext fireChannelUnregistered() { - final DefaultChannelHandlerContext next = findContextInbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeChannelUnregistered(); - } else { - executor.execute(new OneTimeTask() { - @Override - public void run() { - next.invokeChannelUnregistered(); - } - }); - } + DefaultChannelHandlerContext next = findContextInbound(); + next.invoker().invokeChannelUnregistered(next); return this; } - @SuppressWarnings("deprecation") - private void invokeChannelUnregistered() { - try { - ((ChannelInboundHandler) handler).channelUnregistered(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } - @Override public ChannelHandlerContext fireChannelActive() { - final DefaultChannelHandlerContext next = findContextInbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeChannelActive(); - } else { - executor.execute(new OneTimeTask() { - @Override - public void run() { - next.invokeChannelActive(); - } - }); - } + DefaultChannelHandlerContext next = findContextInbound(); + next.invoker().invokeChannelActive(next); return this; } - private void invokeChannelActive() { - try { - ((ChannelInboundHandler) handler).channelActive(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } - @Override public ChannelHandlerContext fireChannelInactive() { - final DefaultChannelHandlerContext next = findContextInbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeChannelInactive(); - } else { - executor.execute(new OneTimeTask() { - @Override - public void run() { - next.invokeChannelInactive(); - } - }); - } + DefaultChannelHandlerContext next = findContextInbound(); + next.invoker().invokeChannelInactive(next); return this; } - private void invokeChannelInactive() { - try { - ((ChannelInboundHandler) handler).channelInactive(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } - @Override - public ChannelHandlerContext fireExceptionCaught(final Throwable cause) { - if (cause == null) { - throw new NullPointerException("cause"); - } - - final DefaultChannelHandlerContext next = this.next; - - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeExceptionCaught(cause); - } else { - try { - executor.execute(new OneTimeTask() { - @Override - public void run() { - next.invokeExceptionCaught(cause); - } - }); - } catch (Throwable t) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to submit an exceptionCaught() event.", t); - logger.warn("The exceptionCaught() event that was failed to submit was:", cause); - } - } - } - + public ChannelHandlerContext fireExceptionCaught(Throwable cause) { + DefaultChannelHandlerContext next = this.next; + next.invoker().invokeExceptionCaught(next, cause); return this; } - @SuppressWarnings("deprecation") - private void invokeExceptionCaught(final Throwable cause) { - try { - handler.exceptionCaught(this, cause); - } catch (Throwable t) { - if (logger.isWarnEnabled()) { - logger.warn( - "An exception was thrown by a user handler's " + - "exceptionCaught() method while handling the following exception:", cause); - } - } - } - @Override - public ChannelHandlerContext fireUserEventTriggered(final Object event) { - if (event == null) { - throw new NullPointerException("event"); - } - - final DefaultChannelHandlerContext next = findContextInbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeUserEventTriggered(event); - } else { - executor.execute(new OneTimeTask() { - @Override - public void run() { - next.invokeUserEventTriggered(event); - } - }); - } + public ChannelHandlerContext fireUserEventTriggered(Object event) { + DefaultChannelHandlerContext next = findContextInbound(); + next.invoker().invokeUserEventTriggered(next, event); return this; } - private void invokeUserEventTriggered(Object event) { - try { - ((ChannelInboundHandler) handler).userEventTriggered(this, event); - } catch (Throwable t) { - notifyHandlerException(t); - } - } - @Override - public ChannelHandlerContext fireChannelRead(final Object msg) { - if (msg == null) { - throw new NullPointerException("msg"); - } - - final DefaultChannelHandlerContext next = findContextInbound(); + public ChannelHandlerContext fireChannelRead(Object msg) { + DefaultChannelHandlerContext next = findContextInbound(); ReferenceCountUtil.touch(msg, next); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeChannelRead(msg); - } else { - executor.execute(new OneTimeTask() { - @Override - public void run() { - next.invokeChannelRead(msg); - } - }); - } + next.invoker().invokeChannelRead(next, msg); return this; } - private void invokeChannelRead(Object msg) { - try { - ((ChannelInboundHandler) handler).channelRead(this, msg); - } catch (Throwable t) { - notifyHandlerException(t); - } - } - @Override public ChannelHandlerContext fireChannelReadComplete() { - final DefaultChannelHandlerContext next = findContextInbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeChannelReadComplete(); - } else { - Runnable task = next.invokeChannelReadCompleteTask; - if (task == null) { - next.invokeChannelReadCompleteTask = task = new Runnable() { - @Override - public void run() { - next.invokeChannelReadComplete(); - } - }; - } - executor.execute(task); - } + DefaultChannelHandlerContext next = findContextInbound(); + next.invoker().invokeChannelReadComplete(next); return this; } - private void invokeChannelReadComplete() { - try { - ((ChannelInboundHandler) handler).channelReadComplete(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } - @Override public ChannelHandlerContext fireChannelWritabilityChanged() { - final DefaultChannelHandlerContext next = findContextInbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeChannelWritabilityChanged(); - } else { - Runnable task = next.invokeChannelWritableStateChangedTask; - if (task == null) { - next.invokeChannelWritableStateChangedTask = task = new Runnable() { - @Override - public void run() { - next.invokeChannelWritabilityChanged(); - } - }; - } - executor.execute(task); - } + DefaultChannelHandlerContext next = findContextInbound(); + next.invoker().invokeChannelWritabilityChanged(next); return this; } - private void invokeChannelWritabilityChanged() { - try { - ((ChannelInboundHandler) handler).channelWritabilityChanged(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } - @Override public ChannelFuture bind(SocketAddress localAddress) { return bind(localAddress, newPromise()); @@ -445,294 +233,81 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou @Override public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { - if (localAddress == null) { - throw new NullPointerException("localAddress"); - } - validatePromise(promise, false); - - final DefaultChannelHandlerContext next = findContextOutbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeBind(localAddress, promise); - } else { - safeExecute(executor, new OneTimeTask() { - @Override - public void run() { - next.invokeBind(localAddress, promise); - } - }, promise, null); - } - + DefaultChannelHandlerContext next = findContextOutbound(); + next.invoker().invokeBind(next, localAddress, promise); return promise; } - private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { - try { - ((ChannelOutboundHandler) handler).bind(this, localAddress, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } - @Override public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { return connect(remoteAddress, null, promise); } @Override - public ChannelFuture connect( - final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { - - if (remoteAddress == null) { - throw new NullPointerException("remoteAddress"); - } - validatePromise(promise, false); - - final DefaultChannelHandlerContext next = findContextOutbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeConnect(remoteAddress, localAddress, promise); - } else { - safeExecute(executor, new OneTimeTask() { - @Override - public void run() { - next.invokeConnect(remoteAddress, localAddress, promise); - } - }, promise, null); - } - + public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { + DefaultChannelHandlerContext next = findContextOutbound(); + next.invoker().invokeConnect(next, remoteAddress, localAddress, promise); return promise; } - private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { - try { - ((ChannelOutboundHandler) handler).connect(this, remoteAddress, localAddress, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } - @Override - public ChannelFuture disconnect(final ChannelPromise promise) { - validatePromise(promise, false); - - final DefaultChannelHandlerContext next = findContextOutbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - // Translate disconnect to close if the channel has no notion of disconnect-reconnect. - // So far, UDP/IP is the only transport that has such behavior. - if (!channel().metadata().hasDisconnect()) { - next.invokeClose(promise); - } else { - next.invokeDisconnect(promise); - } - } else { - safeExecute(executor, new OneTimeTask() { - @Override - public void run() { - if (!channel().metadata().hasDisconnect()) { - next.invokeClose(promise); - } else { - next.invokeDisconnect(promise); - } - } - }, promise, null); + public ChannelFuture disconnect(ChannelPromise promise) { + if (!channel().metadata().hasDisconnect()) { + return close(promise); } + DefaultChannelHandlerContext next = findContextOutbound(); + next.invoker().invokeDisconnect(next, promise); return promise; } - private void invokeDisconnect(ChannelPromise promise) { - try { - ((ChannelOutboundHandler) handler).disconnect(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } - @Override - public ChannelFuture close(final ChannelPromise promise) { - validatePromise(promise, false); - - final DefaultChannelHandlerContext next = findContextOutbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeClose(promise); - } else { - safeExecute(executor, new OneTimeTask() { - @Override - public void run() { - next.invokeClose(promise); - } - }, promise, null); - } - + public ChannelFuture close(ChannelPromise promise) { + DefaultChannelHandlerContext next = findContextOutbound(); + next.invoker().invokeClose(next, promise); return promise; } - private void invokeClose(ChannelPromise promise) { - try { - ((ChannelOutboundHandler) handler).close(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } - @Override - public ChannelFuture deregister(final ChannelPromise promise) { - validatePromise(promise, false); - - final DefaultChannelHandlerContext next = findContextOutbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeDeregister(promise); - } else { - safeExecute(executor, new OneTimeTask() { - @Override - public void run() { - next.invokeDeregister(promise); - } - }, promise, null); - } - + public ChannelFuture deregister(ChannelPromise promise) { + DefaultChannelHandlerContext next = findContextOutbound(); + next.invoker().invokeDeregister(next, promise); return promise; } - @SuppressWarnings("deprecation") - private void invokeDeregister(ChannelPromise promise) { - try { - ((ChannelOutboundHandler) handler).deregister(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } - @Override public ChannelHandlerContext read() { - final DefaultChannelHandlerContext next = findContextOutbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeRead(); - } else { - Runnable task = next.invokeReadTask; - if (task == null) { - next.invokeReadTask = task = new Runnable() { - @Override - public void run() { - next.invokeRead(); - } - }; - } - executor.execute(task); - } - + DefaultChannelHandlerContext next = findContextOutbound(); + next.invoker().invokeRead(next); return this; } - private void invokeRead() { - try { - ((ChannelOutboundHandler) handler).read(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } - @Override public ChannelFuture write(Object msg) { return write(msg, newPromise()); } @Override - public ChannelFuture write(final Object msg, final ChannelPromise promise) { - if (msg == null) { - throw new NullPointerException("msg"); - } - - validatePromise(promise, true); - - write(msg, false, promise); - + public ChannelFuture write(Object msg, ChannelPromise promise) { + DefaultChannelHandlerContext next = findContextOutbound(); + ReferenceCountUtil.touch(msg, next); + next.invoker().invokeWrite(next, msg, promise); return promise; } - private void invokeWrite(Object msg, ChannelPromise promise) { - try { - ((ChannelOutboundHandler) handler).write(this, msg, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } - @Override public ChannelHandlerContext flush() { - final DefaultChannelHandlerContext next = findContextOutbound(); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeFlush(); - } else { - Runnable task = next.invokeFlushTask; - if (task == null) { - next.invokeFlushTask = task = new Runnable() { - @Override - public void run() { - next.invokeFlush(); - } - }; - } - safeExecute(executor, task, channel.voidPromise(), null); - } - + DefaultChannelHandlerContext next = findContextOutbound(); + next.invoker().invokeFlush(next); return this; } - private void invokeFlush() { - try { - ((ChannelOutboundHandler) handler).flush(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } - @Override public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { - if (msg == null) { - throw new NullPointerException("msg"); - } - - validatePromise(promise, true); - - write(msg, true, promise); - - return promise; - } - - private void write(Object msg, boolean flush, ChannelPromise promise) { - DefaultChannelHandlerContext next = findContextOutbound(); ReferenceCountUtil.touch(msg, next); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeWrite(msg, promise); - if (flush) { - next.invokeFlush(); - } - } else { - int size = channel.estimatorHandle().size(msg); - if (size > 0) { - ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer(); - // Check for null as it may be set to null if the channel is closed already - if (buffer != null) { - buffer.incrementPendingOutboundBytes(size); - } - } - Runnable task; - if (flush) { - task = WriteAndFlushTask.newInstance(next, msg, size, promise); - } else { - task = WriteTask.newInstance(next, msg, size, promise); - } - safeExecute(executor, task, promise, msg); - } + next.invoker().invokeWriteAndFlush(next, msg, promise); + return promise; } @Override @@ -740,53 +315,6 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou return writeAndFlush(msg, newPromise()); } - private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) { - // only try to fail the promise if its not a VoidChannelPromise, as - // the VoidChannelPromise would also fire the cause through the pipeline - if (promise instanceof VoidChannelPromise) { - return; - } - - if (!promise.tryFailure(cause)) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to fail the promise because it's done already: {}", promise, cause); - } - } - } - - private void notifyHandlerException(Throwable cause) { - if (inExceptionCaught(cause)) { - if (logger.isWarnEnabled()) { - logger.warn( - "An exception was thrown by a user handler " + - "while handling an exceptionCaught event", cause); - } - return; - } - - invokeExceptionCaught(cause); - } - - private static boolean inExceptionCaught(Throwable cause) { - do { - StackTraceElement[] trace = cause.getStackTrace(); - if (trace != null) { - for (StackTraceElement t : trace) { - if (t == null) { - break; - } - if ("exceptionCaught".equals(t.getMethodName())) { - return true; - } - } - } - - cause = cause.getCause(); - } while (cause != null); - - return false; - } - @Override public ChannelPromise newPromise() { return new DefaultChannelPromise(channel(), executor()); @@ -811,35 +339,6 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou return new FailedChannelFuture(channel(), executor(), cause); } - private void validatePromise(ChannelPromise promise, boolean allowVoidPromise) { - if (promise == null) { - throw new NullPointerException("promise"); - } - - if (promise.isDone()) { - throw new IllegalArgumentException("promise already done: " + promise); - } - - if (promise.channel() != channel()) { - throw new IllegalArgumentException(String.format( - "promise.channel does not match: %s (expected: %s)", promise.channel(), channel())); - } - - if (promise.getClass() == DefaultChannelPromise.class) { - return; - } - - if (!allowVoidPromise && promise instanceof VoidChannelPromise) { - throw new IllegalArgumentException( - StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation"); - } - - if (promise instanceof AbstractChannel.CloseFuture) { - throw new IllegalArgumentException( - StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline"); - } - } - private DefaultChannelHandlerContext findContextInbound() { DefaultChannelHandlerContext ctx = this; do { @@ -870,126 +369,6 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou return removed; } - private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) { - try { - executor.execute(runnable); - } catch (Throwable cause) { - try { - promise.setFailure(cause); - } finally { - if (msg != null) { - ReferenceCountUtil.release(msg); - } - } - } - } - - abstract static class AbstractWriteTask extends OneTimeTask { - private final Recycler.Handle handle; - - private DefaultChannelHandlerContext ctx; - private Object msg; - private ChannelPromise promise; - private int size; - - private AbstractWriteTask(Recycler.Handle handle) { - this.handle = handle; - } - - protected static void init(AbstractWriteTask task, DefaultChannelHandlerContext ctx, - Object msg, int size, ChannelPromise promise) { - task.ctx = ctx; - task.msg = msg; - task.promise = promise; - task.size = size; - } - - @Override - public final void run() { - try { - if (size > 0) { - ChannelOutboundBuffer buffer = ctx.channel.unsafe().outboundBuffer(); - // Check for null as it may be set to null if the channel is closed already - if (buffer != null) { - buffer.decrementPendingOutboundBytes(size); - } - } - write(ctx, msg, promise); - } finally { - // Set to null so the GC can collect them directly - ctx = null; - msg = null; - promise = null; - recycle(handle); - } - } - - protected void write(DefaultChannelHandlerContext ctx, Object msg, ChannelPromise promise) { - ctx.invokeWrite(msg, promise); - } - - protected abstract void recycle(Recycler.Handle handle); - } - - static final class WriteTask - extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable { - - private static final Recycler RECYCLER = new Recycler() { - @Override - protected WriteTask newObject(Handle handle) { - return new WriteTask(handle); - } - }; - - private static WriteTask newInstance( - DefaultChannelHandlerContext ctx, Object msg, int size, ChannelPromise promise) { - WriteTask task = RECYCLER.get(); - init(task, ctx, msg, size, promise); - return task; - } - - private WriteTask(Recycler.Handle handle) { - super(handle); - } - - @Override - protected void recycle(Recycler.Handle handle) { - RECYCLER.recycle(this, handle); - } - } - - static final class WriteAndFlushTask extends AbstractWriteTask { - - private static final Recycler RECYCLER = new Recycler() { - @Override - protected WriteAndFlushTask newObject(Handle handle) { - return new WriteAndFlushTask(handle); - } - }; - - private static WriteAndFlushTask newInstance( - DefaultChannelHandlerContext ctx, Object msg, int size, ChannelPromise promise) { - WriteAndFlushTask task = RECYCLER.get(); - init(task, ctx, msg, size, promise); - return task; - } - - private WriteAndFlushTask(Recycler.Handle handle) { - super(handle); - } - - @Override - public void write(DefaultChannelHandlerContext ctx, Object msg, ChannelPromise promise) { - super.write(ctx, msg, promise); - ctx.invokeFlush(); - } - - @Override - protected void recycle(Recycler.Handle handle) { - RECYCLER.recycle(this, handle); - } - } - @Override public String toHintString() { return '\'' + name + "' will handle the message from this point."; diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerInvoker.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerInvoker.java new file mode 100644 index 0000000000..e040751ac6 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerInvoker.java @@ -0,0 +1,544 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel; + +import io.netty.util.Recycler; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.internal.OneTimeTask; +import io.netty.util.internal.StringUtil; + +import java.net.SocketAddress; + +import static io.netty.channel.ChannelHandlerInvokerUtil.*; +import static io.netty.channel.DefaultChannelPipeline.*; + +public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker { + + private final EventExecutor executor; + + public DefaultChannelHandlerInvoker(EventExecutor executor) { + if (executor == null) { + throw new NullPointerException("executor"); + } + + this.executor = executor; + } + + @Override + public EventExecutor executor() { + return executor; + } + + @Override + public void invokeChannelRegistered(final ChannelHandlerContext ctx) { + if (executor.inEventLoop()) { + invokeChannelRegisteredNow(ctx); + } else { + executor.execute(new Runnable() { + @Override + public void run() { + invokeChannelRegisteredNow(ctx); + } + }); + } + } + + @Override + public void invokeChannelUnregistered(final ChannelHandlerContext ctx) { + if (executor.inEventLoop()) { + invokeChannelUnregisteredNow(ctx); + } else { + executor.execute(new Runnable() { + @Override + public void run() { + invokeChannelUnregisteredNow(ctx); + } + }); + } + } + + @Override + public void invokeChannelActive(final ChannelHandlerContext ctx) { + if (executor.inEventLoop()) { + invokeChannelActiveNow(ctx); + } else { + executor.execute(new Runnable() { + @Override + public void run() { + invokeChannelActiveNow(ctx); + } + }); + } + } + + @Override + public void invokeChannelInactive(final ChannelHandlerContext ctx) { + if (executor.inEventLoop()) { + invokeChannelInactiveNow(ctx); + } else { + executor.execute(new Runnable() { + @Override + public void run() { + invokeChannelInactiveNow(ctx); + } + }); + } + } + + @Override + public void invokeExceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { + if (cause == null) { + throw new NullPointerException("cause"); + } + + if (executor.inEventLoop()) { + invokeExceptionCaughtNow(ctx, cause); + } else { + try { + executor.execute(new Runnable() { + @Override + public void run() { + invokeExceptionCaughtNow(ctx, cause); + } + }); + } catch (Throwable t) { + if (logger.isWarnEnabled()) { + logger.warn("Failed to submit an exceptionCaught() event.", t); + logger.warn("The exceptionCaught() event that was failed to submit was:", cause); + } + } + } + } + + @Override + public void invokeUserEventTriggered(final ChannelHandlerContext ctx, final Object event) { + if (event == null) { + throw new NullPointerException("event"); + } + + if (executor.inEventLoop()) { + invokeUserEventTriggeredNow(ctx, event); + } else { + safeExecuteInbound(new Runnable() { + @Override + public void run() { + invokeUserEventTriggeredNow(ctx, event); + } + }, event); + } + } + + @Override + public void invokeChannelRead(final ChannelHandlerContext ctx, final Object msg) { + if (msg == null) { + throw new NullPointerException("msg"); + } + + if (executor.inEventLoop()) { + invokeChannelReadNow(ctx, msg); + } else { + safeExecuteInbound(new Runnable() { + @Override + public void run() { + invokeChannelReadNow(ctx, msg); + } + }, msg); + } + } + + @Override + public void invokeChannelReadComplete(final ChannelHandlerContext ctx) { + if (executor.inEventLoop()) { + invokeChannelReadCompleteNow(ctx); + } else { + DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx; + Runnable task = dctx.invokeChannelReadCompleteTask; + if (task == null) { + dctx.invokeChannelReadCompleteTask = task = new Runnable() { + @Override + public void run() { + invokeChannelReadCompleteNow(ctx); + } + }; + } + executor.execute(task); + } + } + + @Override + public void invokeChannelWritabilityChanged(final ChannelHandlerContext ctx) { + if (executor.inEventLoop()) { + invokeChannelWritabilityChangedNow(ctx); + } else { + DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx; + Runnable task = dctx.invokeChannelWritableStateChangedTask; + if (task == null) { + dctx.invokeChannelWritableStateChangedTask = task = new Runnable() { + @Override + public void run() { + invokeChannelWritabilityChangedNow(ctx); + } + }; + } + executor.execute(task); + } + } + + @Override + public void invokeBind( + final ChannelHandlerContext ctx, final SocketAddress localAddress, final ChannelPromise promise) { + if (localAddress == null) { + throw new NullPointerException("localAddress"); + } + validatePromise(ctx, promise, false); + + if (executor.inEventLoop()) { + invokeBindNow(ctx, localAddress, promise); + } else { + safeExecuteOutbound(new Runnable() { + @Override + public void run() { + invokeBindNow(ctx, localAddress, promise); + } + }, promise); + } + } + + @Override + public void invokeConnect( + final ChannelHandlerContext ctx, + final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { + if (remoteAddress == null) { + throw new NullPointerException("remoteAddress"); + } + validatePromise(ctx, promise, false); + + if (executor.inEventLoop()) { + invokeConnectNow(ctx, remoteAddress, localAddress, promise); + } else { + safeExecuteOutbound(new Runnable() { + @Override + public void run() { + invokeConnectNow(ctx, remoteAddress, localAddress, promise); + } + }, promise); + } + } + + @Override + public void invokeDisconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) { + validatePromise(ctx, promise, false); + + if (executor.inEventLoop()) { + invokeDisconnectNow(ctx, promise); + } else { + safeExecuteOutbound(new Runnable() { + @Override + public void run() { + invokeDisconnectNow(ctx, promise); + } + }, promise); + } + } + + @Override + public void invokeClose(final ChannelHandlerContext ctx, final ChannelPromise promise) { + validatePromise(ctx, promise, false); + + if (executor.inEventLoop()) { + invokeCloseNow(ctx, promise); + } else { + safeExecuteOutbound(new Runnable() { + @Override + public void run() { + invokeCloseNow(ctx, promise); + } + }, promise); + } + } + + @Override + public void invokeDeregister(final ChannelHandlerContext ctx, final ChannelPromise promise) { + validatePromise(ctx, promise, false); + + if (executor.inEventLoop()) { + invokeDeregisterNow(ctx, promise); + } else { + safeExecuteOutbound(new Runnable() { + @Override + public void run() { + invokeDeregisterNow(ctx, promise); + } + }, promise); + } + } + + @Override + public void invokeRead(final ChannelHandlerContext ctx) { + if (executor.inEventLoop()) { + invokeReadNow(ctx); + } else { + DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx; + Runnable task = dctx.invokeReadTask; + if (task == null) { + dctx.invokeReadTask = task = new Runnable() { + @Override + public void run() { + invokeReadNow(ctx); + } + }; + } + executor.execute(task); + } + } + + @Override + public void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + if (msg == null) { + throw new NullPointerException("msg"); + } + + validatePromise(ctx, promise, true); + invokeWrite(ctx, msg, false, promise); + } + + private void invokeWrite(ChannelHandlerContext ctx, Object msg, boolean flush, ChannelPromise promise) { + if (executor.inEventLoop()) { + invokeWriteNow(ctx, msg, promise); + if (flush) { + invokeFlushNow(ctx); + } + } else { + AbstractChannel channel = (AbstractChannel) ctx.channel(); + int size = channel.estimatorHandle().size(msg); + if (size > 0) { + ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer(); + // Check for null as it may be set to null if the channel is closed already + if (buffer != null) { + buffer.incrementPendingOutboundBytes(size); + } + } + Runnable task; + if (flush) { + task = WriteAndFlushTask.newInstance(ctx, msg, size, promise); + } else { + task = WriteTask.newInstance(ctx, msg, size, promise); + } + safeExecuteOutbound(task, promise, msg); + } + } + + @Override + public void invokeFlush(final ChannelHandlerContext ctx) { + if (executor.inEventLoop()) { + invokeFlushNow(ctx); + } else { + DefaultChannelHandlerContext dctx = (DefaultChannelHandlerContext) ctx; + Runnable task = dctx.invokeFlushTask; + if (task == null) { + dctx.invokeFlushTask = task = new Runnable() { + @Override + public void run() { + invokeFlushNow(ctx); + } + }; + } + executor.execute(task); + } + } + + @Override + public void invokeWriteAndFlush(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + if (msg == null) { + throw new NullPointerException("msg"); + } + + validatePromise(ctx, promise, true); + + invokeWrite(ctx, msg, true, promise); + } + + private static void validatePromise(ChannelHandlerContext ctx, ChannelPromise promise, boolean allowVoidPromise) { + if (ctx == null) { + throw new NullPointerException("ctx"); + } + + if (promise == null) { + throw new NullPointerException("promise"); + } + + if (promise.isDone()) { + throw new IllegalArgumentException("promise already done: " + promise); + } + + if (promise.channel() != ctx.channel()) { + throw new IllegalArgumentException(String.format( + "promise.channel does not match: %s (expected: %s)", promise.channel(), ctx.channel())); + } + + if (promise.getClass() == DefaultChannelPromise.class) { + return; + } + + if (!allowVoidPromise && promise instanceof VoidChannelPromise) { + throw new IllegalArgumentException( + StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation"); + } + + if (promise instanceof AbstractChannel.CloseFuture) { + throw new IllegalArgumentException( + StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline"); + } + } + + private void safeExecuteInbound(Runnable task, Object msg) { + boolean success = false; + try { + executor.execute(task); + success = true; + } finally { + if (!success) { + ReferenceCountUtil.release(msg); + } + } + } + + private void safeExecuteOutbound(Runnable task, ChannelPromise promise) { + try { + executor.execute(task); + } catch (Throwable cause) { + promise.setFailure(cause); + } + } + private void safeExecuteOutbound(Runnable task, ChannelPromise promise, Object msg) { + try { + executor.execute(task); + } catch (Throwable cause) { + try { + promise.setFailure(cause); + } finally { + ReferenceCountUtil.release(msg); + } + } + } + + abstract static class AbstractWriteTask extends OneTimeTask { + private final Recycler.Handle handle; + + private ChannelHandlerContext ctx; + private Object msg; + private ChannelPromise promise; + private int size; + + protected AbstractWriteTask(Recycler.Handle handle) { + this.handle = handle; + } + + protected static void init( + AbstractWriteTask task, ChannelHandlerContext ctx, Object msg, int size, ChannelPromise promise) { + task.ctx = ctx; + task.msg = msg; + task.promise = promise; + task.size = size; + } + + @Override + public final void run() { + try { + if (size > 0) { + ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer(); + // Check for null as it may be set to null if the channel is closed already + if (buffer != null) { + buffer.decrementPendingOutboundBytes(size); + } + } + write(ctx, msg, promise); + } finally { + // Set to null so the GC can collect them directly + ctx = null; + msg = null; + promise = null; + recycle(handle); + } + } + + protected void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + invokeWriteNow(ctx, msg, promise); + } + + protected abstract void recycle(Recycler.Handle handle); + } + + static final class WriteTask + extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable { + + private static final Recycler RECYCLER = new Recycler() { + @Override + protected WriteTask newObject(Handle handle) { + return new WriteTask(handle); + } + }; + + static WriteTask newInstance(ChannelHandlerContext ctx, Object msg, int size, ChannelPromise promise) { + WriteTask task = RECYCLER.get(); + init(task, ctx, msg, size, promise); + return task; + } + + private WriteTask(Recycler.Handle handle) { + super(handle); + } + + @Override + protected void recycle(Recycler.Handle handle) { + RECYCLER.recycle(this, handle); + } + } + + static final class WriteAndFlushTask extends AbstractWriteTask { + + private static final Recycler RECYCLER = new Recycler() { + @Override + protected WriteAndFlushTask newObject(Handle handle) { + return new WriteAndFlushTask(handle); + } + }; + + static WriteAndFlushTask newInstance( + ChannelHandlerContext ctx, Object msg, int size, ChannelPromise promise) { + WriteAndFlushTask task = RECYCLER.get(); + init(task, ctx, msg, size, promise); + return task; + } + + private WriteAndFlushTask(Recycler.Handle handle) { + super(handle); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + super.write(ctx, msg, promise); + invokeFlushNow(ctx); + } + + @Override + protected void recycle(Recycler.Handle handle) { + RECYCLER.recycle(this, handle); + } + } +} diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 643f54f05e..9a4e697b43 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -36,6 +36,7 @@ import java.util.NoSuchElementException; import java.util.WeakHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; /** * The default {@link ChannelPipeline} implementation. It is usually created @@ -49,10 +50,21 @@ final class DefaultChannelPipeline implements ChannelPipeline { private static final WeakHashMap, String>[] nameCaches = new WeakHashMap[Runtime.getRuntime().availableProcessors()]; + @SuppressWarnings("rawtypes") + private static final AtomicReferenceFieldUpdater childInvokersUpdater; + static { for (int i = 0; i < nameCaches.length; i ++) { nameCaches[i] = new WeakHashMap, String>(); } + + @SuppressWarnings("rawtypes") + AtomicReferenceFieldUpdater updater; + updater = PlatformDependent.newAtomicReferenceFieldUpdater(DefaultChannelPipeline.class, "childInvokers"); + if (updater == null) { + updater = AtomicReferenceFieldUpdater.newUpdater(DefaultChannelPipeline.class, Map.class, "childInvokers"); + } + childInvokersUpdater = updater; } final AbstractChannel channel; @@ -63,8 +75,13 @@ final class DefaultChannelPipeline implements ChannelPipeline { private final Map name2ctx = new HashMap(4); - final Map childExecutors = - new IdentityHashMap(); + /** + * Updated by {@link #childInvokersUpdater}. + * + * @see #findInvoker(EventExecutorGroup) + */ + @SuppressWarnings("UnusedDeclaration") + private volatile Map childInvokers; DefaultChannelPipeline(AbstractChannel channel) { if (channel == null) { @@ -89,17 +106,20 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline addFirst(String name, ChannelHandler handler) { - return addFirst(null, name, handler); + return addFirst((ChannelHandlerInvoker) null, name, handler); } @Override - public ChannelPipeline addFirst(EventExecutorGroup group, final String name, ChannelHandler handler) { + public ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) { + return addFirst(findInvoker(group), name, handler); + } + + @Override + public ChannelPipeline addFirst(ChannelHandlerInvoker invoker, final String name, ChannelHandler handler) { synchronized (this) { checkDuplicateName(name); - DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler); - addFirst0(name, newCtx); + addFirst0(name, new DefaultChannelHandlerContext(this, invoker, name, handler)); } - return this; } @@ -119,18 +139,20 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline addLast(String name, ChannelHandler handler) { - return addLast(null, name, handler); + return addLast((ChannelHandlerInvoker) null, name, handler); } @Override - public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) { + public ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { + return addLast(findInvoker(group), name, handler); + } + + @Override + public ChannelPipeline addLast(ChannelHandlerInvoker invoker, final String name, ChannelHandler handler) { synchronized (this) { checkDuplicateName(name); - - DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler); - addLast0(name, newCtx); + addLast0(name, new DefaultChannelHandlerContext(this, invoker, name, handler)); } - return this; } @@ -150,17 +172,21 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) { - return addBefore(null, baseName, name, handler); + return addBefore((ChannelHandlerInvoker) null, baseName, name, handler); + } + + @Override + public ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) { + return addBefore(findInvoker(group), baseName, name, handler); } @Override public ChannelPipeline addBefore( - EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) { + ChannelHandlerInvoker invoker, String baseName, final String name, ChannelHandler handler) { synchronized (this) { DefaultChannelHandlerContext ctx = getContextOrDie(baseName); checkDuplicateName(name); - DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler); - addBefore0(name, ctx, newCtx); + addBefore0(name, ctx, new DefaultChannelHandlerContext(this, invoker, name, handler)); } return this; } @@ -180,20 +206,22 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) { - return addAfter(null, baseName, name, handler); + return addAfter((ChannelHandlerInvoker) null, baseName, name, handler); + } + + @Override + public ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) { + return addAfter(findInvoker(group), baseName, name, handler); } @Override public ChannelPipeline addAfter( - EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) { + ChannelHandlerInvoker invoker, String baseName, final String name, ChannelHandler handler) { synchronized (this) { DefaultChannelHandlerContext ctx = getContextOrDie(baseName); checkDuplicateName(name); - DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler); - - addAfter0(name, ctx, newCtx); + addAfter0(name, ctx, new DefaultChannelHandlerContext(this, invoker, name, handler)); } - return this; } @@ -213,11 +241,16 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline addFirst(ChannelHandler... handlers) { - return addFirst(null, handlers); + return addFirst((ChannelHandlerInvoker) null, handlers); } @Override - public ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) { + public ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers) { + return addFirst(findInvoker(group), handlers); + } + + @Override + public ChannelPipeline addFirst(ChannelHandlerInvoker invoker, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } @@ -234,7 +267,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { for (int i = size - 1; i >= 0; i --) { ChannelHandler h = handlers[i]; - addFirst(executor, generateName(h), h); + addFirst(invoker, generateName(h), h); } return this; @@ -242,11 +275,16 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline addLast(ChannelHandler... handlers) { - return addLast(null, handlers); + return addLast((ChannelHandlerInvoker) null, handlers); } @Override - public ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { + public ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers) { + return addLast(findInvoker(group), handlers); + } + + @Override + public ChannelPipeline addLast(ChannelHandlerInvoker invoker, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } @@ -255,12 +293,45 @@ final class DefaultChannelPipeline implements ChannelPipeline { if (h == null) { break; } - addLast(executor, generateName(h), h); + addLast(invoker, generateName(h), h); } return this; } + private ChannelHandlerInvoker findInvoker(EventExecutorGroup group) { + if (group == null) { + return null; + } + + // Lazily initialize the data structure that maps an EventExecutorGroup to a ChannelHandlerInvoker. + Map childInvokers = this.childInvokers; + if (childInvokers == null) { + childInvokers = new IdentityHashMap(); + if (!childInvokersUpdater.compareAndSet(this, null, childInvokers)) { + childInvokers = this.childInvokers; + } + } + + // Pick one of the child executors and remember its invoker + // so that the same invoker is used to fire events for the same channel. + ChannelHandlerInvoker invoker; + synchronized (childInvokers) { + invoker = childInvokers.get(group); + if (invoker == null) { + EventExecutor executor = group.next(); + if (executor instanceof EventLoop) { + invoker = ((EventLoop) executor).asInvoker(); + } else { + invoker = new DefaultChannelHandlerInvoker(executor); + } + childInvokers.put(group, invoker); + } + } + + return invoker; + } + String generateName(ChannelHandler handler) { WeakHashMap, String> cache = nameCaches[(int) (Thread.currentThread().getId() % nameCaches.length)]; Class handlerType = handler.getClass(); @@ -396,7 +467,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } final DefaultChannelHandlerContext newCtx = - new DefaultChannelHandlerContext(this, ctx.executor, newName, newHandler); + new DefaultChannelHandlerContext(this, ctx.invoker, newName, newHandler); if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { replace0(ctx, newName, newCtx); diff --git a/transport/src/main/java/io/netty/channel/EventLoop.java b/transport/src/main/java/io/netty/channel/EventLoop.java index 13a2a396f2..a750a216fd 100644 --- a/transport/src/main/java/io/netty/channel/EventLoop.java +++ b/transport/src/main/java/io/netty/channel/EventLoop.java @@ -27,4 +27,10 @@ import io.netty.util.concurrent.EventExecutor; public interface EventLoop extends EventExecutor, EventLoopGroup { @Override EventLoopGroup parent(); + + /** + * Creates a new default {@link ChannelHandlerInvoker} implementation that uses this {@link EventLoop} to + * invoke event handler methods. + */ + ChannelHandlerInvoker asInvoker(); } diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index 0c09821668..81cc3dbaef 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -26,6 +26,8 @@ import java.util.concurrent.ThreadFactory; */ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { + private final ChannelHandlerInvoker invoker = new DefaultChannelHandlerInvoker(this); + protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { super(parent, threadFactory, addTaskWakesUp); } @@ -44,6 +46,11 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im return (EventLoop) super.next(); } + @Override + public ChannelHandlerInvoker asInvoker() { + return invoker; + } + @Override public ChannelFuture register(Channel channel) { return register(channel, new DefaultChannelPromise(channel, this)); diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java index 36df01fd37..0c5121bfe8 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java @@ -18,16 +18,21 @@ package io.netty.channel.embedded; import io.netty.channel.AbstractEventLoop; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelHandlerInvoker; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelPromise; -import io.netty.channel.EventLoop; +import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; +import java.net.SocketAddress; import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.TimeUnit; -final class EmbeddedEventLoop extends AbstractEventLoop { +import static io.netty.channel.ChannelHandlerInvokerUtil.*; + +final class EmbeddedEventLoop extends AbstractEventLoop implements ChannelHandlerInvoker { private final Queue tasks = new ArrayDeque(2); @@ -82,9 +87,7 @@ final class EmbeddedEventLoop extends AbstractEventLoop { } @Override - public boolean awaitTermination(long timeout, TimeUnit unit) - throws InterruptedException { - Thread.sleep(unit.toMillis(timeout)); + public boolean awaitTermination(long timeout, TimeUnit unit) { return false; } @@ -110,7 +113,103 @@ final class EmbeddedEventLoop extends AbstractEventLoop { } @Override - public EventLoop next() { + public ChannelHandlerInvoker asInvoker() { return this; } + + @Override + public EventExecutor executor() { + return this; + } + @Override + public void invokeChannelRegistered(ChannelHandlerContext ctx) { + invokeChannelRegisteredNow(ctx); + } + + @Override + public void invokeChannelActive(ChannelHandlerContext ctx) { + invokeChannelActiveNow(ctx); + } + + @Override + public void invokeChannelInactive(ChannelHandlerContext ctx) { + invokeChannelInactiveNow(ctx); + } + + @Override + public void invokeChannelUnregistered(ChannelHandlerContext ctx) { + invokeChannelUnregisteredNow(ctx); + } + + @Override + public void invokeExceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + invokeExceptionCaughtNow(ctx, cause); + } + + @Override + public void invokeUserEventTriggered(ChannelHandlerContext ctx, Object event) { + invokeUserEventTriggeredNow(ctx, event); + } + + @Override + public void invokeChannelRead(ChannelHandlerContext ctx, Object msg) { + invokeChannelReadNow(ctx, msg); + } + + @Override + public void invokeChannelReadComplete(ChannelHandlerContext ctx) { + invokeChannelReadCompleteNow(ctx); + } + + @Override + public void invokeChannelWritabilityChanged(ChannelHandlerContext ctx) { + invokeChannelWritabilityChangedNow(ctx); + } + + @Override + public void invokeBind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) { + invokeBindNow(ctx, localAddress, promise); + } + + @Override + public void invokeConnect( + ChannelHandlerContext ctx, + SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { + invokeConnectNow(ctx, remoteAddress, localAddress, promise); + } + + @Override + public void invokeDisconnect(ChannelHandlerContext ctx, ChannelPromise promise) { + invokeDisconnectNow(ctx, promise); + } + + @Override + public void invokeClose(ChannelHandlerContext ctx, ChannelPromise promise) { + invokeCloseNow(ctx, promise); + } + + @Override + public void invokeDeregister(ChannelHandlerContext ctx, ChannelPromise promise) { + invokeDeregisterNow(ctx, promise); + } + + @Override + public void invokeRead(ChannelHandlerContext ctx) { + invokeReadNow(ctx); + } + + @Override + public void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + invokeWriteNow(ctx, msg, promise); + } + + @Override + public void invokeWriteAndFlush(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + invokeWriteAndFlushNow(ctx, msg, promise); + } + + @Override + public void invokeFlush(ChannelHandlerContext ctx) { + invokeFlushNow(ctx); + } } diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java index 805aca589b..584307816f 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java @@ -16,6 +16,7 @@ package io.netty.channel.nio; import io.netty.channel.Channel; +import io.netty.channel.EventLoop; import io.netty.channel.MultithreadEventLoopGroup; import io.netty.util.concurrent.EventExecutor; @@ -92,8 +93,7 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup { } @Override - protected EventExecutor newChild( - Executor executor, Object... args) throws Exception { + protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0]); } }