diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java index ba7bb10d49..d49e6561a4 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java @@ -34,16 +34,30 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme volatile AbstractChannelHandlerContext next; volatile AbstractChannelHandlerContext prev; + /** + * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called. + */ + private static final int ADDED = 1; + /** + * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called. + */ + private static final int REMOVED = 2; + /** + * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} + * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called. + */ + private static final int INIT = 0; + private final boolean inbound; private final boolean outbound; private final DefaultChannelPipeline pipeline; private final String name; - private boolean handlerRemoved; // Will be set to null if no child executor should be used, otherwise it will be set to the // child executor. final EventExecutor executor; private ChannelFuture succeededFuture; + private int handlerState = INIT; // Lazily instantiated tasks used to trigger events to a handler with different executor. // These needs to be volatile as otherwise an other Thread may see an half initialized instance. @@ -114,10 +128,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } private void invokeChannelRegistered() { - try { - ((ChannelInboundHandler) handler()).channelRegistered(this); - } catch (Throwable t) { - notifyHandlerException(t); + if (isAdded()) { + try { + ((ChannelInboundHandler) handler()).channelRegistered(this); + } catch (Throwable t) { + notifyHandlerException(t); + } + } else { + fireChannelRegistered(); } } @@ -139,10 +157,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } private void invokeChannelUnregistered() { - try { - ((ChannelInboundHandler) handler()).channelUnregistered(this); - } catch (Throwable t) { - notifyHandlerException(t); + if (isAdded()) { + try { + ((ChannelInboundHandler) handler()).channelUnregistered(this); + } catch (Throwable t) { + notifyHandlerException(t); + } + } else { + fireChannelUnregistered(); } } @@ -164,10 +186,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } private void invokeChannelActive() { - try { - ((ChannelInboundHandler) handler()).channelActive(this); - } catch (Throwable t) { - notifyHandlerException(t); + if (isAdded()) { + try { + ((ChannelInboundHandler) handler()).channelActive(this); + } catch (Throwable t) { + notifyHandlerException(t); + } + } else { + fireChannelActive(); } } @@ -189,10 +215,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } private void invokeChannelInactive() { - try { - ((ChannelInboundHandler) handler()).channelInactive(this); - } catch (Throwable t) { - notifyHandlerException(t); + if (isAdded()) { + try { + ((ChannelInboundHandler) handler()).channelInactive(this); + } catch (Throwable t) { + notifyHandlerException(t); + } + } else { + fireChannelInactive(); } } @@ -226,14 +256,18 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } private void invokeExceptionCaught(final Throwable cause) { - try { - handler().exceptionCaught(this, cause); - } catch (Throwable t) { - if (logger.isWarnEnabled()) { - logger.warn( - "An exception was thrown by a user handler's " + - "exceptionCaught() method while handling the following exception:", cause); + if (isAdded()) { + try { + handler().exceptionCaught(this, cause); + } catch (Throwable t) { + if (logger.isWarnEnabled()) { + logger.warn( + "An exception was thrown by a user handler's " + + "exceptionCaught() method while handling the following exception:", cause); + } } + } else { + fireExceptionCaught(cause); } } @@ -259,10 +293,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } private void invokeUserEventTriggered(Object event) { - try { - ((ChannelInboundHandler) handler()).userEventTriggered(this, event); - } catch (Throwable t) { - notifyHandlerException(t); + if (isAdded()) { + try { + ((ChannelInboundHandler) handler()).userEventTriggered(this, event); + } catch (Throwable t) { + notifyHandlerException(t); + } + } else { + fireUserEventTriggered(event); } } @@ -288,10 +326,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } private void invokeChannelRead(Object msg) { - try { - ((ChannelInboundHandler) handler()).channelRead(this, msg); - } catch (Throwable t) { - notifyHandlerException(t); + if (isAdded()) { + try { + ((ChannelInboundHandler) handler()).channelRead(this, msg); + } catch (Throwable t) { + notifyHandlerException(t); + } + } else { + fireChannelRead(msg); } } @@ -317,10 +359,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } private void invokeChannelReadComplete() { - try { - ((ChannelInboundHandler) handler()).channelReadComplete(this); - } catch (Throwable t) { - notifyHandlerException(t); + if (isAdded()) { + try { + ((ChannelInboundHandler) handler()).channelReadComplete(this); + } catch (Throwable t) { + notifyHandlerException(t); + } + } else { + fireChannelReadComplete(); } } @@ -346,10 +392,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } private void invokeChannelWritabilityChanged() { - try { - ((ChannelInboundHandler) handler()).channelWritabilityChanged(this); - } catch (Throwable t) { - notifyHandlerException(t); + if (isAdded()) { + try { + ((ChannelInboundHandler) handler()).channelWritabilityChanged(this); + } catch (Throwable t) { + notifyHandlerException(t); + } + } else { + fireChannelWritabilityChanged(); } } @@ -409,10 +459,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { - try { - ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); + if (isAdded()) { + try { + ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } else { + bind(localAddress, promise); } } @@ -449,10 +503,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { - try { - ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); + if (isAdded()) { + try { + ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } else { + connect(remoteAddress, localAddress, promise); } } @@ -489,10 +547,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } private void invokeDisconnect(ChannelPromise promise) { - try { - ((ChannelOutboundHandler) handler()).disconnect(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); + if (isAdded()) { + try { + ((ChannelOutboundHandler) handler()).disconnect(this, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } else { + disconnect(promise); } } @@ -520,10 +582,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } private void invokeClose(ChannelPromise promise) { - try { - ((ChannelOutboundHandler) handler()).close(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); + if (isAdded()) { + try { + ((ChannelOutboundHandler) handler()).close(this, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } else { + close(promise); } } @@ -551,10 +617,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } private void invokeDeregister(ChannelPromise promise) { - try { - ((ChannelOutboundHandler) handler()).deregister(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); + if (isAdded()) { + try { + ((ChannelOutboundHandler) handler()).deregister(this, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } else { + deregister(promise); } } @@ -581,10 +651,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } private void invokeRead() { - try { - ((ChannelOutboundHandler) handler()).read(this); - } catch (Throwable t) { - notifyHandlerException(t); + if (isAdded()) { + try { + ((ChannelOutboundHandler) handler()).read(this); + } catch (Throwable t) { + notifyHandlerException(t); + } + } else { + read(); } } @@ -615,6 +689,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } private void invokeWrite(Object msg, ChannelPromise promise) { + if (isAdded()) { + invokeWrite0(msg, promise); + } else { + write(msg, promise); + } + } + + private void invokeWrite0(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { @@ -645,6 +727,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme } private void invokeFlush() { + if (isAdded()) { + invokeFlush0(); + } else { + flush(); + } + } + + private void invokeFlush0() { try { ((ChannelOutboundHandler) handler()).flush(this); } catch (Throwable t) { @@ -669,13 +759,23 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme return promise; } + private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { + if (isAdded()) { + invokeWrite0(msg, promise); + invokeFlush0(); + } else { + writeAndFlush(msg, promise); + } + } + private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { - next.invokeWrite(msg, promise); if (flush) { - next.invokeFlush(); + next.invokeWriteAndFlush(msg, promise); + } else { + next.invokeWrite(msg, promise); } } else { AbstractWriteTask task; @@ -816,13 +916,29 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme return channel().voidPromise(); } - void setRemoved() { - handlerRemoved = true; + final void setRemoved() { + handlerState = REMOVED; + } + + final void setAdded() { + handlerState = ADDED; + } + + /** + * Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called + * yet. If not return {@code false} and if called or could not detect return {@code true}. + * + * If this method returns {@code true} we will not invoke the {@link ChannelHandler} but just forward the event. + * This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list + * but not called {@link } + */ + private boolean isAdded() { + return handlerState == ADDED; } @Override public boolean isRemoved() { - return handlerRemoved; + return handlerState == REMOVED; } private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) { diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 1143ef7862..fa45b3c2d9 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -21,7 +21,6 @@ import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.OneTimeTask; -import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -35,8 +34,6 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.WeakHashMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; /** @@ -128,7 +125,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { public ChannelPipeline addFirst(EventExecutorGroup group, final String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; final EventExecutor executor; - final boolean inEventLoop; synchronized (this) { checkDuplicateName(name); checkMultiplicity(handler); @@ -136,33 +132,27 @@ final class DefaultChannelPipeline implements ChannelPipeline { newCtx = newContext(group, name, handler); executor = executorSafe(newCtx.executor); + addFirst0(newCtx); + // If the executor is null it means that the channel was not registered on an eventloop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (executor == null) { - addFirst0(newCtx); callHandlerCallbackLater(newCtx, true); return this; } - inEventLoop = executor.inEventLoop(); - if (inEventLoop) { - addFirst0(newCtx); + + if (!executor.inEventLoop()) { + executor.execute(new OneTimeTask() { + @Override + public void run() { + callHandlerAdded0(newCtx); + } + }); + return this; } } - - if (inEventLoop) { - callHandlerAdded0(newCtx); - } else { - waitForFuture(executor.submit(new OneTimeTask() { - @Override - public void run() { - synchronized (DefaultChannelPipeline.this) { - addFirst0(newCtx); - } - callHandlerAdded0(newCtx); - } - })); - } + callHandlerAdded0(newCtx); return this; } @@ -183,7 +173,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) { final EventExecutor executor; final AbstractChannelHandlerContext newCtx; - final boolean inEventLoop; synchronized (this) { checkDuplicateName(name); checkMultiplicity(handler); @@ -191,32 +180,26 @@ final class DefaultChannelPipeline implements ChannelPipeline { newCtx = newContext(group, name, handler); executor = executorSafe(newCtx.executor); + addLast0(newCtx); + // If the executor is null it means that the channel was not registered on an eventloop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (executor == null) { - addLast0(newCtx); callHandlerCallbackLater(newCtx, true); return this; } - inEventLoop = executor.inEventLoop(); - if (inEventLoop) { - addLast0(newCtx); + if (!executor.inEventLoop()) { + executor.execute(new OneTimeTask() { + @Override + public void run() { + callHandlerAdded0(newCtx); + } + }); + return this; } } - if (inEventLoop) { - callHandlerAdded0(newCtx); - } else { - waitForFuture(executor.submit(new OneTimeTask() { - @Override - public void run() { - synchronized (DefaultChannelPipeline.this) { - addLast0(newCtx); - } - callHandlerAdded0(newCtx); - } - })); - } + callHandlerAdded0(newCtx); return this; } @@ -239,7 +222,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { final EventExecutor executor; final AbstractChannelHandlerContext newCtx; final AbstractChannelHandlerContext ctx; - final boolean inEventLoop; synchronized (this) { checkMultiplicity(handler); ctx = getContextOrDie(baseName); @@ -248,34 +230,27 @@ final class DefaultChannelPipeline implements ChannelPipeline { newCtx = newContext(group, name, handler); executor = executorSafe(newCtx.executor); + addBefore0(ctx, newCtx); + // If the executor is null it means that the channel was not registered on an eventloop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (executor == null) { - addBefore0(ctx, newCtx); callHandlerCallbackLater(newCtx, true); return this; } - inEventLoop = executor.inEventLoop(); - if (inEventLoop) { - addBefore0(ctx, newCtx); + if (!executor.inEventLoop()) { + executor.execute(new OneTimeTask() { + @Override + public void run() { + callHandlerAdded0(newCtx); + } + }); + return this; } } - - if (inEventLoop) { - callHandlerAdded0(newCtx); - } else { - waitForFuture(executor.submit(new OneTimeTask() { - @Override - public void run() { - synchronized (DefaultChannelPipeline.this) { - addBefore0(ctx, newCtx); - } - callHandlerAdded0(newCtx); - } - })); - } + callHandlerAdded0(newCtx); return this; } @@ -297,7 +272,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { final EventExecutor executor; final AbstractChannelHandlerContext newCtx; final AbstractChannelHandlerContext ctx; - final boolean inEventLoop; synchronized (this) { checkMultiplicity(handler); @@ -307,32 +281,26 @@ final class DefaultChannelPipeline implements ChannelPipeline { newCtx = newContext(group, name, handler); executor = executorSafe(newCtx.executor); + addAfter0(ctx, newCtx); + // If the executor is null it means that the channel was not registered on an eventloop yet. // In this case we remove the context from the pipeline and add a task that will call // ChannelHandler.handlerRemoved(...) once the channel is registered. if (executor == null) { - addAfter0(ctx, newCtx); callHandlerCallbackLater(newCtx, true); return this; } - inEventLoop = executor.inEventLoop(); - if (inEventLoop) { - addAfter0(ctx, newCtx); + if (!executor.inEventLoop()) { + executor.execute(new OneTimeTask() { + @Override + public void run() { + callHandlerAdded0(newCtx); + } + }); + return this; } } - if (inEventLoop) { - callHandlerAdded0(newCtx); - } else { - waitForFuture(executor.submit(new OneTimeTask() { - @Override - public void run() { - synchronized (DefaultChannelPipeline.this) { - addAfter0(ctx, newCtx); - } - callHandlerAdded0(newCtx); - } - })); - } + callHandlerAdded0(newCtx); return this; } @@ -442,36 +410,30 @@ final class DefaultChannelPipeline implements ChannelPipeline { assert ctx != head && ctx != tail; final EventExecutor executor; - final boolean inEventLoop; synchronized (this) { executor = executorSafe(ctx.executor); + remove0(ctx); + // If the executor is null it means that the channel was not registered on an eventloop yet. // In this case we remove the context from the pipeline and add a task that will call // ChannelHandler.handlerRemoved(...) once the channel is registered. if (executor == null) { - remove0(ctx); callHandlerCallbackLater(ctx, false); return ctx; } - inEventLoop = executor.inEventLoop(); - if (inEventLoop) { - remove0(ctx); + + if (!executor.inEventLoop()) { + executor.execute(new OneTimeTask() { + @Override + public void run() { + callHandlerRemoved0(ctx); + } + }); + return ctx; } } - if (inEventLoop) { - callHandlerRemoved0(ctx); - } else { - waitForFuture(executor.submit(new OneTimeTask() { - @Override - public void run() { - synchronized (DefaultChannelPipeline.this) { - remove0(ctx); - } - callHandlerRemoved0(ctx); - } - })); - } + callHandlerRemoved0(ctx); return ctx; } @@ -522,7 +484,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { final AbstractChannelHandlerContext newCtx; final EventExecutor executor; - final boolean inEventLoop; synchronized (this) { checkMultiplicity(newHandler); boolean sameName = ctx.name().equals(newName); @@ -533,42 +494,36 @@ final class DefaultChannelPipeline implements ChannelPipeline { newCtx = newContext(ctx.executor, newName, newHandler); executor = executorSafe(ctx.executor); + replace0(ctx, newCtx); + // If the executor is null it means that the channel was not registered on an eventloop yet. // In this case we replace the context in the pipeline // and add a task that will call ChannelHandler.handlerAdded(...) and // ChannelHandler.handlerRemoved(...) once the channel is registered. if (executor == null) { - replace0(ctx, newCtx); callHandlerCallbackLater(newCtx, true); callHandlerCallbackLater(ctx, false); return ctx.handler(); } - inEventLoop = executor.inEventLoop(); - if (inEventLoop) { - replace0(ctx, newCtx); + if (!executor.inEventLoop()) { + executor.execute(new OneTimeTask() { + @Override + public void run() { + // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked) + // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and + // those event handlers must be called after handlerAdded(). + callHandlerAdded0(newCtx); + callHandlerRemoved0(ctx); + } + }); + return ctx.handler(); } } - if (inEventLoop) { - // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked) - // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those - // event handlers must be called after handlerAdded(). - callHandlerAdded0(newCtx); - callHandlerRemoved0(ctx); - } else { - waitForFuture(executor.submit(new OneTimeTask() { - @Override - public void run() { - synchronized (DefaultChannelPipeline.this) { - replace0(ctx, newCtx); - } - // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked) - // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and - // those event handlers must be called after handlerAdded(). - callHandlerAdded0(newCtx); - callHandlerRemoved0(ctx); - } - })); - } + // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked) + // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those + // event handlers must be called after handlerAdded(). + callHandlerAdded0(newCtx); + callHandlerRemoved0(ctx); return ctx.handler(); } @@ -605,6 +560,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { try { ctx.handler().handlerAdded(ctx); + ctx.setAdded(); } catch (Throwable t) { boolean removed = false; try { @@ -647,33 +603,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { } } - /** - * Waits for a future to finish. If the task is interrupted, then the current thread will be interrupted. - * It is expected that the task performs any appropriate locking. - *

- * If the internal call throws a {@link Throwable}, but it is not an instance of {@link Error} or - * {@link RuntimeException}, then it is wrapped inside a {@link ChannelPipelineException} and that is - * thrown instead.

- * - * @param future wait for this future - * @see Future#get() - * @throws Error if the task threw this. - * @throws RuntimeException if the task threw this. - * @throws ChannelPipelineException with a {@link Throwable} as a cause, if the task threw another type of - * {@link Throwable}. - */ - private static void waitForFuture(Future future) { - try { - future.get(); - } catch (ExecutionException ex) { - // In the arbitrary case, we can throw Error, RuntimeException, and Exception - PlatformDependent.throwException(ex.getCause()); - } catch (InterruptedException ex) { - // Interrupt the calling thread (note that this method is not called from the event loop) - Thread.currentThread().interrupt(); - } - } - @Override public ChannelHandler first() { ChannelHandlerContext first = firstContext(); @@ -1176,6 +1105,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { TailContext(DefaultChannelPipeline pipeline) { super(pipeline, null, TAIL_NAME, true, false); + setAdded(); } @Override @@ -1247,6 +1177,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); + setAdded(); } @Override diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index 81dc0549fd..383eae9e7e 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -34,6 +34,7 @@ import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; import org.junit.After; import org.junit.AfterClass; @@ -687,16 +688,19 @@ public class DefaultChannelPipelineTest { assertTrue(removedQueue.isEmpty()); pipeline.channel().close().syncUninterruptibly(); - assertHandler(handler1, addedQueue.take()); - assertHandler(handler2, addedQueue.take()); - assertHandler(handler3, addedQueue.take()); - assertHandler(handler4, addedQueue.take()); + assertHandler(addedQueue.take(), handler1); + + // Depending on timing this can be handler2 or handler3 as these use different EventExecutorGroups. + assertHandler(addedQueue.take(), handler2, handler3, handler4); + assertHandler(addedQueue.take(), handler2, handler3, handler4); + assertHandler(addedQueue.take(), handler2, handler3, handler4); + assertTrue(addedQueue.isEmpty()); - assertHandler(handler4, removedQueue.take()); - assertHandler(handler3, removedQueue.take()); - assertHandler(handler2, removedQueue.take()); - assertHandler(handler1, removedQueue.take()); + assertHandler(removedQueue.take(), handler4); + assertHandler(removedQueue.take(), handler3); + assertHandler(removedQueue.take(), handler2); + assertHandler(removedQueue.take(), handler1); assertTrue(removedQueue.isEmpty()); } finally { group1.shutdownGracefully(); @@ -780,39 +784,7 @@ public class DefaultChannelPipelineTest { } } - @Test(timeout = 3000) - public void testHandlerAddBlocksUntilHandlerAddedCalled() { - final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1); - try { - final Promise promise = group1.next().newPromise(); - ChannelPipeline pipeline = new LocalChannel().pipeline(); - group.register(pipeline.channel()).syncUninterruptibly(); - - pipeline.addLast(new ChannelHandlerAdapter() { - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - final AtomicBoolean handlerAddedCalled = new AtomicBoolean(); - ctx.pipeline().addLast(group1, new ChannelHandlerAdapter() { - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - handlerAddedCalled.set(true); - } - }); - if (handlerAddedCalled.get()) { - promise.setSuccess(null); - } else { - promise.setFailure(new AssertionError("handlerAdded(...) was not called yet")); - } - } - }); - - promise.syncUninterruptibly(); - } finally { - group1.shutdownGracefully(); - } - } - - @Test + @Test(timeout = 2000) public void testAddRemoveHandlerCalledOnceRegistered() throws Throwable { ChannelPipeline pipeline = new LocalChannel().pipeline(); CallbackCheckHandler handler = new CallbackCheckHandler(); @@ -820,8 +792,8 @@ public class DefaultChannelPipelineTest { pipeline.addFirst(handler); pipeline.remove(handler); - assertFalse(handler.addedHandler.get()); - assertFalse(handler.removedHandler.get()); + assertNull(handler.addedHandler.getNow()); + assertNull(handler.removedHandler.getNow()); group.register(pipeline.channel()).syncUninterruptibly(); Throwable cause = handler.error.get(); @@ -833,7 +805,7 @@ public class DefaultChannelPipelineTest { assertTrue(handler.removedHandler.get()); } - @Test + @Test(timeout = 3000) public void testAddReplaceHandlerCalledOnceRegistered() throws Throwable { ChannelPipeline pipeline = new LocalChannel().pipeline(); CallbackCheckHandler handler = new CallbackCheckHandler(); @@ -842,10 +814,10 @@ public class DefaultChannelPipelineTest { pipeline.addFirst(handler); pipeline.replace(handler, "newHandler", handler2); - assertFalse(handler.addedHandler.get()); - assertFalse(handler.removedHandler.get()); - assertFalse(handler2.addedHandler.get()); - assertFalse(handler2.removedHandler.get()); + assertNull(handler.addedHandler.getNow()); + assertNull(handler.removedHandler.getNow()); + assertNull(handler2.addedHandler.getNow()); + assertNull(handler2.removedHandler.getNow()); group.register(pipeline.channel()).syncUninterruptibly(); Throwable cause = handler.error.get(); @@ -862,30 +834,71 @@ public class DefaultChannelPipelineTest { } assertTrue(handler2.addedHandler.get()); - assertFalse(handler2.removedHandler.get()); + assertNull(handler2.removedHandler.getNow()); pipeline.remove(handler2); assertTrue(handler2.removedHandler.get()); } + @Test(timeout = 3000) + public void testAddBefore() throws Throwable { + ChannelPipeline pipeline1 = new LocalChannel().pipeline(); + ChannelPipeline pipeline2 = new LocalChannel().pipeline(); + + EventLoopGroup defaultGroup = new LocalEventLoopGroup(2); + try { + EventLoop eventLoop1 = defaultGroup.next(); + EventLoop eventLoop2 = defaultGroup.next(); + + eventLoop1.register(pipeline1.channel()).syncUninterruptibly(); + eventLoop2.register(pipeline2.channel()).syncUninterruptibly(); + + CountDownLatch latch = new CountDownLatch(2 * 10); + for (int i = 0; i < 10; i++) { + eventLoop1.execute(new TestTask(pipeline2, latch)); + eventLoop2.execute(new TestTask(pipeline1, latch)); + } + latch.await(); + } finally { + defaultGroup.shutdownGracefully(); + } + } + + private static final class TestTask implements Runnable { + + private final ChannelPipeline pipeline; + private final CountDownLatch latch; + + TestTask(ChannelPipeline pipeline, CountDownLatch latch) { + this.pipeline = pipeline; + this.latch = latch; + } + + @Override + public void run() { + pipeline.addLast(new ChannelInboundHandlerAdapter()); + latch.countDown(); + } + } + private static final class CallbackCheckHandler extends ChannelHandlerAdapter { - final AtomicBoolean addedHandler = new AtomicBoolean(); - final AtomicBoolean removedHandler = new AtomicBoolean(); + final Promise addedHandler = ImmediateEventExecutor.INSTANCE.newPromise(); + final Promise removedHandler = ImmediateEventExecutor.INSTANCE.newPromise(); final AtomicReference error = new AtomicReference(); @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - if (!addedHandler.compareAndSet(false, true)) { + if (!addedHandler.trySuccess(true)) { error.set(new AssertionError("handlerAdded(...) called multiple times: " + ctx.name())); - } else if (removedHandler.get()) { + } else if (removedHandler.getNow() == Boolean.TRUE) { error.set(new AssertionError("handlerRemoved(...) called before handlerAdded(...): " + ctx.name())); } } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - if (!removedHandler.compareAndSet(false, true)) { + if (!removedHandler.trySuccess(true)) { error.set(new AssertionError("handlerRemoved(...) called multiple times: " + ctx.name())); - } else if (!addedHandler.get()) { + } else if (addedHandler.getNow() == Boolean.FALSE) { error.set(new AssertionError("handlerRemoved(...) called before handlerAdded(...): " + ctx.name())); } } @@ -910,9 +923,14 @@ public class DefaultChannelPipelineTest { } } - private static void assertHandler(CheckOrderHandler expected, CheckOrderHandler actual) throws Throwable { - assertSame(expected, actual); - actual.checkError(); + private static void assertHandler(CheckOrderHandler actual, CheckOrderHandler... handlers) throws Throwable { + for (CheckOrderHandler h : handlers) { + if (h == actual) { + actual.checkError(); + return; + } + } + fail("handler was not one of the expected handlers"); } private static final class CheckOrderHandler extends ChannelHandlerAdapter {