diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java index 462d9098f2..7b4f639b25 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java @@ -39,16 +39,30 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap volatile AbstractChannelHandlerContext next; volatile AbstractChannelHandlerContext prev; + /** + * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called. + */ + private static final int ADDED = 1; + /** + * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called. + */ + private static final int REMOVED = 2; + /** + * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} + * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called. + */ + private static final int INIT = 0; + private final boolean inbound; private final boolean outbound; private final DefaultChannelPipeline pipeline; private final String name; - private boolean handlerRemoved; // Will be set to null if no child executor should be used, otherwise it will be set to the // child executor. final EventExecutor executor; private ChannelFuture succeededFuture; + private int handlerState = INIT; // Lazily instantiated tasks used to trigger events to a handler with different executor. // These needs to be volatile as otherwise an other Thread may see an half initialized instance. @@ -119,10 +133,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } private void invokeChannelRegistered() { - try { - ((ChannelInboundHandler) handler()).channelRegistered(this); - } catch (Throwable t) { - notifyHandlerException(t); + if (isAdded()) { + try { + ((ChannelInboundHandler) handler()).channelRegistered(this); + } catch (Throwable t) { + notifyHandlerException(t); + } + } else { + fireChannelRegistered(); } } @@ -144,10 +162,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } private void invokeChannelUnregistered() { - try { - ((ChannelInboundHandler) handler()).channelUnregistered(this); - } catch (Throwable t) { - notifyHandlerException(t); + if (isAdded()) { + try { + ((ChannelInboundHandler) handler()).channelUnregistered(this); + } catch (Throwable t) { + notifyHandlerException(t); + } + } else { + fireChannelUnregistered(); } } @@ -169,10 +191,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } private void invokeChannelActive() { - try { - ((ChannelInboundHandler) handler()).channelActive(this); - } catch (Throwable t) { - notifyHandlerException(t); + if (isAdded()) { + try { + ((ChannelInboundHandler) handler()).channelActive(this); + } catch (Throwable t) { + notifyHandlerException(t); + } + } else { + fireChannelActive(); } } @@ -194,10 +220,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } private void invokeChannelInactive() { - try { - ((ChannelInboundHandler) handler()).channelInactive(this); - } catch (Throwable t) { - notifyHandlerException(t); + if (isAdded()) { + try { + ((ChannelInboundHandler) handler()).channelInactive(this); + } catch (Throwable t) { + notifyHandlerException(t); + } + } else { + fireChannelInactive(); } } @@ -231,14 +261,18 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } private void invokeExceptionCaught(final Throwable cause) { - try { - handler().exceptionCaught(this, cause); - } catch (Throwable t) { - if (logger.isWarnEnabled()) { - logger.warn( - "An exception was thrown by a user handler's " + - "exceptionCaught() method while handling the following exception:", cause); + if (isAdded()) { + try { + handler().exceptionCaught(this, cause); + } catch (Throwable t) { + if (logger.isWarnEnabled()) { + logger.warn( + "An exception was thrown by a user handler's " + + "exceptionCaught() method while handling the following exception:", cause); + } } + } else { + fireExceptionCaught(cause); } } @@ -264,10 +298,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } private void invokeUserEventTriggered(Object event) { - try { - ((ChannelInboundHandler) handler()).userEventTriggered(this, event); - } catch (Throwable t) { - notifyHandlerException(t); + if (isAdded()) { + try { + ((ChannelInboundHandler) handler()).userEventTriggered(this, event); + } catch (Throwable t) { + notifyHandlerException(t); + } + } else { + fireUserEventTriggered(event); } } @@ -294,10 +332,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } private void invokeChannelRead(Object msg) { - try { - ((ChannelInboundHandler) handler()).channelRead(this, msg); - } catch (Throwable t) { - notifyHandlerException(t); + if (isAdded()) { + try { + ((ChannelInboundHandler) handler()).channelRead(this, msg); + } catch (Throwable t) { + notifyHandlerException(t); + } + } else { + fireChannelRead(msg); } } @@ -323,10 +365,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } private void invokeChannelReadComplete() { - try { - ((ChannelInboundHandler) handler()).channelReadComplete(this); - } catch (Throwable t) { - notifyHandlerException(t); + if (isAdded()) { + try { + ((ChannelInboundHandler) handler()).channelReadComplete(this); + } catch (Throwable t) { + notifyHandlerException(t); + } + } else { + fireChannelReadComplete(); } } @@ -352,10 +398,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } private void invokeChannelWritabilityChanged() { - try { - ((ChannelInboundHandler) handler()).channelWritabilityChanged(this); - } catch (Throwable t) { - notifyHandlerException(t); + if (isAdded()) { + try { + ((ChannelInboundHandler) handler()).channelWritabilityChanged(this); + } catch (Throwable t) { + notifyHandlerException(t); + } + } else { + fireChannelWritabilityChanged(); } } @@ -415,10 +465,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { - try { - ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); + if (isAdded()) { + try { + ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } else { + bind(localAddress, promise); } } @@ -455,10 +509,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { - try { - ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); + if (isAdded()) { + try { + ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } else { + connect(remoteAddress, localAddress, promise); } } @@ -495,10 +553,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } private void invokeDisconnect(ChannelPromise promise) { - try { - ((ChannelOutboundHandler) handler()).disconnect(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); + if (isAdded()) { + try { + ((ChannelOutboundHandler) handler()).disconnect(this, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } else { + disconnect(promise); } } @@ -526,10 +588,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } private void invokeClose(ChannelPromise promise) { - try { - ((ChannelOutboundHandler) handler()).close(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); + if (isAdded()) { + try { + ((ChannelOutboundHandler) handler()).close(this, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } else { + close(promise); } } @@ -557,10 +623,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } private void invokeDeregister(ChannelPromise promise) { - try { - ((ChannelOutboundHandler) handler()).deregister(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); + if (isAdded()) { + try { + ((ChannelOutboundHandler) handler()).deregister(this, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } else { + deregister(promise); } } @@ -587,10 +657,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } private void invokeRead() { - try { - ((ChannelOutboundHandler) handler()).read(this); - } catch (Throwable t) { - notifyHandlerException(t); + if (isAdded()) { + try { + ((ChannelOutboundHandler) handler()).read(this); + } catch (Throwable t) { + notifyHandlerException(t); + } + } else { + read(); } } @@ -621,6 +695,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } private void invokeWrite(Object msg, ChannelPromise promise) { + if (isAdded()) { + invokeWrite0(msg, promise); + } else { + write(msg, promise); + } + } + + private void invokeWrite0(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { @@ -651,6 +733,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } private void invokeFlush() { + if (isAdded()) { + invokeFlush0(); + } else { + flush(); + } + } + + private void invokeFlush0() { try { ((ChannelOutboundHandler) handler()).flush(this); } catch (Throwable t) { @@ -675,14 +765,24 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return promise; } + private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { + if (isAdded()) { + invokeWrite0(msg, promise); + invokeFlush0(); + } else { + writeAndFlush(msg, promise); + } + } + private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { - next.invokeWrite(m, promise); if (flush) { - next.invokeFlush(); + next.invokeWriteAndFlush(m, promise); + } else { + next.invokeWrite(m, promise); } } else { AbstractWriteTask task; @@ -823,13 +923,29 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return channel().voidPromise(); } - void setRemoved() { - handlerRemoved = true; + final void setRemoved() { + handlerState = REMOVED; + } + + final void setAdded() { + handlerState = ADDED; + } + + /** + * Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called + * yet. If not return {@code false} and if called or could not detect return {@code true}. + * + * If this method returns {@code true} we will not invoke the {@link ChannelHandler} but just forward the event. + * This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list + * but not called {@link } + */ + private boolean isAdded() { + return handlerState == ADDED; } @Override public boolean isRemoved() { - return handlerRemoved; + return handlerState == REMOVED; } @Override diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 5209b92e6f..3babb42754 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -22,7 +22,6 @@ import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.OneTimeTask; -import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -36,8 +35,6 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.WeakHashMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; /** @@ -139,7 +136,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { public ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; final EventExecutor executor; - final boolean inEventLoop; synchronized (this) { if (name == null) { name = generateName(handler); @@ -151,33 +147,27 @@ final class DefaultChannelPipeline implements ChannelPipeline { newCtx = newContext(group, name, handler); executor = executorSafe(newCtx.executor); + addFirst0(newCtx); + // If the executor is null it means that the channel was not registered on an eventloop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (executor == null) { - addFirst0(newCtx); callHandlerCallbackLater(newCtx, true); return this; } - inEventLoop = executor.inEventLoop(); - if (inEventLoop) { - addFirst0(newCtx); + + if (!executor.inEventLoop()) { + executor.execute(new OneTimeTask() { + @Override + public void run() { + callHandlerAdded0(newCtx); + } + }); + return this; } } - - if (inEventLoop) { - callHandlerAdded0(newCtx); - } else { - waitForFuture(executor.submit(new OneTimeTask() { - @Override - public void run() { - synchronized (DefaultChannelPipeline.this) { - addFirst0(newCtx); - } - callHandlerAdded0(newCtx); - } - })); - } + callHandlerAdded0(newCtx); return this; } @@ -198,7 +188,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { public ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final EventExecutor executor; final AbstractChannelHandlerContext newCtx; - final boolean inEventLoop; synchronized (this) { if (name == null) { name = generateName(handler); @@ -210,32 +199,26 @@ final class DefaultChannelPipeline implements ChannelPipeline { newCtx = newContext(group, name, handler); executor = executorSafe(newCtx.executor); + addLast0(newCtx); + // If the executor is null it means that the channel was not registered on an eventloop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (executor == null) { - addLast0(newCtx); callHandlerCallbackLater(newCtx, true); return this; } - inEventLoop = executor.inEventLoop(); - if (inEventLoop) { - addLast0(newCtx); + if (!executor.inEventLoop()) { + executor.execute(new OneTimeTask() { + @Override + public void run() { + callHandlerAdded0(newCtx); + } + }); + return this; } } - if (inEventLoop) { - callHandlerAdded0(newCtx); - } else { - waitForFuture(executor.submit(new OneTimeTask() { - @Override - public void run() { - synchronized (DefaultChannelPipeline.this) { - addLast0(newCtx); - } - callHandlerAdded0(newCtx); - } - })); - } + callHandlerAdded0(newCtx); return this; } @@ -258,7 +241,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { final EventExecutor executor; final AbstractChannelHandlerContext newCtx; final AbstractChannelHandlerContext ctx; - final boolean inEventLoop; synchronized (this) { checkMultiplicity(handler); ctx = getContextOrDie(baseName); @@ -271,34 +253,27 @@ final class DefaultChannelPipeline implements ChannelPipeline { newCtx = newContext(group, name, handler); executor = executorSafe(newCtx.executor); + addBefore0(ctx, newCtx); + // If the executor is null it means that the channel was not registered on an eventloop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (executor == null) { - addBefore0(ctx, newCtx); callHandlerCallbackLater(newCtx, true); return this; } - inEventLoop = executor.inEventLoop(); - if (inEventLoop) { - addBefore0(ctx, newCtx); + if (!executor.inEventLoop()) { + executor.execute(new OneTimeTask() { + @Override + public void run() { + callHandlerAdded0(newCtx); + } + }); + return this; } } - - if (inEventLoop) { - callHandlerAdded0(newCtx); - } else { - waitForFuture(executor.submit(new OneTimeTask() { - @Override - public void run() { - synchronized (DefaultChannelPipeline.this) { - addBefore0(ctx, newCtx); - } - callHandlerAdded0(newCtx); - } - })); - } + callHandlerAdded0(newCtx); return this; } @@ -320,7 +295,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { final EventExecutor executor; final AbstractChannelHandlerContext newCtx; final AbstractChannelHandlerContext ctx; - final boolean inEventLoop; synchronized (this) { checkMultiplicity(handler); @@ -330,32 +304,26 @@ final class DefaultChannelPipeline implements ChannelPipeline { newCtx = newContext(group, name, handler); executor = executorSafe(newCtx.executor); + addAfter0(ctx, newCtx); + // If the executor is null it means that the channel was not registered on an eventloop yet. // In this case we remove the context from the pipeline and add a task that will call // ChannelHandler.handlerRemoved(...) once the channel is registered. if (executor == null) { - addAfter0(ctx, newCtx); callHandlerCallbackLater(newCtx, true); return this; } - inEventLoop = executor.inEventLoop(); - if (inEventLoop) { - addAfter0(ctx, newCtx); + if (!executor.inEventLoop()) { + executor.execute(new OneTimeTask() { + @Override + public void run() { + callHandlerAdded0(newCtx); + } + }); + return this; } } - if (inEventLoop) { - callHandlerAdded0(newCtx); - } else { - waitForFuture(executor.submit(new OneTimeTask() { - @Override - public void run() { - synchronized (DefaultChannelPipeline.this) { - addAfter0(ctx, newCtx); - } - callHandlerAdded0(newCtx); - } - })); - } + callHandlerAdded0(newCtx); return this; } @@ -465,36 +433,30 @@ final class DefaultChannelPipeline implements ChannelPipeline { assert ctx != head && ctx != tail; final EventExecutor executor; - final boolean inEventLoop; synchronized (this) { executor = executorSafe(ctx.executor); + remove0(ctx); + // If the executor is null it means that the channel was not registered on an eventloop yet. // In this case we remove the context from the pipeline and add a task that will call // ChannelHandler.handlerRemoved(...) once the channel is registered. if (executor == null) { - remove0(ctx); callHandlerCallbackLater(ctx, false); return ctx; } - inEventLoop = executor.inEventLoop(); - if (inEventLoop) { - remove0(ctx); + + if (!executor.inEventLoop()) { + executor.execute(new OneTimeTask() { + @Override + public void run() { + callHandlerRemoved0(ctx); + } + }); + return ctx; } } - if (inEventLoop) { - callHandlerRemoved0(ctx); - } else { - waitForFuture(executor.submit(new OneTimeTask() { - @Override - public void run() { - synchronized (DefaultChannelPipeline.this) { - remove0(ctx); - } - callHandlerRemoved0(ctx); - } - })); - } + callHandlerRemoved0(ctx); return ctx; } @@ -545,7 +507,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { final AbstractChannelHandlerContext newCtx; final EventExecutor executor; - final boolean inEventLoop; synchronized (this) { checkMultiplicity(newHandler); if (newName == null) { @@ -560,42 +521,36 @@ final class DefaultChannelPipeline implements ChannelPipeline { newCtx = newContext(ctx.executor, newName, newHandler); executor = executorSafe(ctx.executor); + replace0(ctx, newCtx); + // If the executor is null it means that the channel was not registered on an eventloop yet. // In this case we replace the context in the pipeline // and add a task that will call ChannelHandler.handlerAdded(...) and // ChannelHandler.handlerRemoved(...) once the channel is registered. if (executor == null) { - replace0(ctx, newCtx); callHandlerCallbackLater(newCtx, true); callHandlerCallbackLater(ctx, false); return ctx.handler(); } - inEventLoop = executor.inEventLoop(); - if (inEventLoop) { - replace0(ctx, newCtx); + if (!executor.inEventLoop()) { + executor.execute(new OneTimeTask() { + @Override + public void run() { + // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked) + // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and + // those event handlers must be called after handlerAdded(). + callHandlerAdded0(newCtx); + callHandlerRemoved0(ctx); + } + }); + return ctx.handler(); } } - if (inEventLoop) { - // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked) - // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those - // event handlers must be called after handlerAdded(). - callHandlerAdded0(newCtx); - callHandlerRemoved0(ctx); - } else { - waitForFuture(executor.submit(new OneTimeTask() { - @Override - public void run() { - synchronized (DefaultChannelPipeline.this) { - replace0(ctx, newCtx); - } - // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked) - // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and - // those event handlers must be called after handlerAdded(). - callHandlerAdded0(newCtx); - callHandlerRemoved0(ctx); - } - })); - } + // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked) + // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those + // event handlers must be called after handlerAdded(). + callHandlerAdded0(newCtx); + callHandlerRemoved0(ctx); return ctx.handler(); } @@ -632,6 +587,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { try { ctx.handler().handlerAdded(ctx); + ctx.setAdded(); } catch (Throwable t) { boolean removed = false; try { @@ -674,33 +630,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { } } - /** - * Waits for a future to finish. If the task is interrupted, then the current thread will be interrupted. - * It is expected that the task performs any appropriate locking. - *

- * If the internal call throws a {@link Throwable}, but it is not an instance of {@link Error} or - * {@link RuntimeException}, then it is wrapped inside a {@link ChannelPipelineException} and that is - * thrown instead.

- * - * @param future wait for this future - * @see Future#get() - * @throws Error if the task threw this. - * @throws RuntimeException if the task threw this. - * @throws ChannelPipelineException with a {@link Throwable} as a cause, if the task threw another type of - * {@link Throwable}. - */ - private static void waitForFuture(Future future) { - try { - future.get(); - } catch (ExecutionException ex) { - // In the arbitrary case, we can throw Error, RuntimeException, and Exception - PlatformDependent.throwException(ex.getCause()); - } catch (InterruptedException ex) { - // Interrupt the calling thread (note that this method is not called from the event loop) - Thread.currentThread().interrupt(); - } - } - @Override public ChannelHandler first() { ChannelHandlerContext first = firstContext(); @@ -1228,6 +1157,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { TailContext(DefaultChannelPipeline pipeline) { super(pipeline, null, TAIL_NAME, true, false); + setAdded(); } @Override @@ -1299,6 +1229,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); + setAdded(); } @Override diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index 6380805b8a..4a8fea305f 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -33,6 +33,7 @@ import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; import org.junit.After; import org.junit.AfterClass; @@ -700,16 +701,19 @@ public class DefaultChannelPipelineTest { assertTrue(removedQueue.isEmpty()); pipeline.channel().close().syncUninterruptibly(); - assertHandler(handler1, addedQueue.take()); - assertHandler(handler2, addedQueue.take()); - assertHandler(handler3, addedQueue.take()); - assertHandler(handler4, addedQueue.take()); + assertHandler(addedQueue.take(), handler1); + + // Depending on timing this can be handler2 or handler3 as these use different EventExecutorGroups. + assertHandler(addedQueue.take(), handler2, handler3, handler4); + assertHandler(addedQueue.take(), handler2, handler3, handler4); + assertHandler(addedQueue.take(), handler2, handler3, handler4); + assertTrue(addedQueue.isEmpty()); - assertHandler(handler4, removedQueue.take()); - assertHandler(handler3, removedQueue.take()); - assertHandler(handler2, removedQueue.take()); - assertHandler(handler1, removedQueue.take()); + assertHandler(removedQueue.take(), handler4); + assertHandler(removedQueue.take(), handler3); + assertHandler(removedQueue.take(), handler2); + assertHandler(removedQueue.take(), handler1); assertTrue(removedQueue.isEmpty()); } finally { group1.shutdownGracefully(); @@ -793,39 +797,7 @@ public class DefaultChannelPipelineTest { } } - @Test(timeout = 3000) - public void testHandlerAddBlocksUntilHandlerAddedCalled() { - final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1); - try { - final Promise promise = group1.next().newPromise(); - ChannelPipeline pipeline = new LocalChannel().pipeline(); - group.register(pipeline.channel()).syncUninterruptibly(); - - pipeline.addLast(new ChannelHandlerAdapter() { - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - final AtomicBoolean handlerAddedCalled = new AtomicBoolean(); - ctx.pipeline().addLast(group1, new ChannelHandlerAdapter() { - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - handlerAddedCalled.set(true); - } - }); - if (handlerAddedCalled.get()) { - promise.setSuccess(null); - } else { - promise.setFailure(new AssertionError("handlerAdded(...) was not called yet")); - } - } - }); - - promise.syncUninterruptibly(); - } finally { - group1.shutdownGracefully(); - } - } - - @Test + @Test(timeout = 2000) public void testAddRemoveHandlerCalledOnceRegistered() throws Throwable { ChannelPipeline pipeline = new LocalChannel().pipeline(); CallbackCheckHandler handler = new CallbackCheckHandler(); @@ -833,8 +805,8 @@ public class DefaultChannelPipelineTest { pipeline.addFirst(handler); pipeline.remove(handler); - assertFalse(handler.addedHandler.get()); - assertFalse(handler.removedHandler.get()); + assertNull(handler.addedHandler.getNow()); + assertNull(handler.removedHandler.getNow()); group.register(pipeline.channel()).syncUninterruptibly(); Throwable cause = handler.error.get(); @@ -846,7 +818,7 @@ public class DefaultChannelPipelineTest { assertTrue(handler.removedHandler.get()); } - @Test + @Test(timeout = 3000) public void testAddReplaceHandlerCalledOnceRegistered() throws Throwable { ChannelPipeline pipeline = new LocalChannel().pipeline(); CallbackCheckHandler handler = new CallbackCheckHandler(); @@ -855,10 +827,10 @@ public class DefaultChannelPipelineTest { pipeline.addFirst(handler); pipeline.replace(handler, null, handler2); - assertFalse(handler.addedHandler.get()); - assertFalse(handler.removedHandler.get()); - assertFalse(handler2.addedHandler.get()); - assertFalse(handler2.removedHandler.get()); + assertNull(handler.addedHandler.getNow()); + assertNull(handler.removedHandler.getNow()); + assertNull(handler2.addedHandler.getNow()); + assertNull(handler2.removedHandler.getNow()); group.register(pipeline.channel()).syncUninterruptibly(); Throwable cause = handler.error.get(); @@ -875,30 +847,71 @@ public class DefaultChannelPipelineTest { } assertTrue(handler2.addedHandler.get()); - assertFalse(handler2.removedHandler.get()); + assertNull(handler2.removedHandler.getNow()); pipeline.remove(handler2); assertTrue(handler2.removedHandler.get()); } + @Test(timeout = 3000) + public void testAddBefore() throws Throwable { + ChannelPipeline pipeline1 = new LocalChannel().pipeline(); + ChannelPipeline pipeline2 = new LocalChannel().pipeline(); + + EventLoopGroup defaultGroup = new DefaultEventLoopGroup(2); + try { + EventLoop eventLoop1 = defaultGroup.next(); + EventLoop eventLoop2 = defaultGroup.next(); + + eventLoop1.register(pipeline1.channel()).syncUninterruptibly(); + eventLoop2.register(pipeline2.channel()).syncUninterruptibly(); + + CountDownLatch latch = new CountDownLatch(2 * 10); + for (int i = 0; i < 10; i++) { + eventLoop1.execute(new TestTask(pipeline2, latch)); + eventLoop2.execute(new TestTask(pipeline1, latch)); + } + latch.await(); + } finally { + defaultGroup.shutdownGracefully(); + } + } + + private static final class TestTask implements Runnable { + + private final ChannelPipeline pipeline; + private final CountDownLatch latch; + + TestTask(ChannelPipeline pipeline, CountDownLatch latch) { + this.pipeline = pipeline; + this.latch = latch; + } + + @Override + public void run() { + pipeline.addLast(new ChannelInboundHandlerAdapter()); + latch.countDown(); + } + } + private static final class CallbackCheckHandler extends ChannelHandlerAdapter { - final AtomicBoolean addedHandler = new AtomicBoolean(); - final AtomicBoolean removedHandler = new AtomicBoolean(); + final Promise addedHandler = ImmediateEventExecutor.INSTANCE.newPromise(); + final Promise removedHandler = ImmediateEventExecutor.INSTANCE.newPromise(); final AtomicReference error = new AtomicReference(); @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - if (!addedHandler.compareAndSet(false, true)) { + if (!addedHandler.trySuccess(true)) { error.set(new AssertionError("handlerAdded(...) called multiple times: " + ctx.name())); - } else if (removedHandler.get()) { + } else if (removedHandler.getNow() == Boolean.TRUE) { error.set(new AssertionError("handlerRemoved(...) called before handlerAdded(...): " + ctx.name())); } } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - if (!removedHandler.compareAndSet(false, true)) { + if (!removedHandler.trySuccess(true)) { error.set(new AssertionError("handlerRemoved(...) called multiple times: " + ctx.name())); - } else if (!addedHandler.get()) { + } else if (addedHandler.getNow() == Boolean.FALSE) { error.set(new AssertionError("handlerRemoved(...) called before handlerAdded(...): " + ctx.name())); } } @@ -923,9 +936,14 @@ public class DefaultChannelPipelineTest { } } - private static void assertHandler(CheckOrderHandler expected, CheckOrderHandler actual) throws Throwable { - assertSame(expected, actual); - actual.checkError(); + private static void assertHandler(CheckOrderHandler actual, CheckOrderHandler... handlers) throws Throwable { + for (CheckOrderHandler h : handlers) { + if (h == actual) { + actual.checkError(); + return; + } + } + fail("handler was not one of the expected handlers"); } private static final class CheckOrderHandler extends ChannelHandlerAdapter {