diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java index 705a864477..35fb446e5a 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java @@ -15,8 +15,6 @@ */ package io.netty.channel; -import static io.netty.channel.DefaultChannelPipeline.logger; - import io.netty.buffer.ByteBufAllocator; import io.netty.util.DefaultAttributeMap; import io.netty.util.Recycler; @@ -29,6 +27,8 @@ import io.netty.util.internal.StringUtil; import java.net.SocketAddress; +import static io.netty.channel.DefaultChannelPipeline.*; + abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext { volatile AbstractChannelHandlerContext next; @@ -82,31 +82,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme this.outbound = outbound; } - /** Invocation initiated by {@link DefaultChannelPipeline#teardownAll()}}. */ - void teardown() { - EventExecutor executor = executor(); - if (executor.inEventLoop()) { - teardown0(); - } else { - executor.execute(new Runnable() { - @Override - public void run() { - teardown0(); - } - }); - } - } - - private void teardown0() { - AbstractChannelHandlerContext prev = this.prev; - if (prev != null) { - synchronized (pipeline) { - pipeline.remove0(this); - } - prev.teardown(); - } - } - @Override public Channel channel() { return channel; diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index ff4ccf3cee..80146ff243 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -19,6 +19,7 @@ import io.netty.channel.Channel.Unsafe; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; @@ -739,18 +740,76 @@ final class DefaultChannelPipeline implements ChannelPipeline { // Remove all handlers sequentially if channel is closed and unregistered. if (!channel.isOpen()) { - teardownAll(); + destroy(); } return this; } /** - * Removes all handlers from the pipeline one by one from tail (exclusive) to head (inclusive) to trigger - * handlerRemoved(). Note that the tail handler is excluded because it's neither an outbound handler nor it - * does anything in handlerRemoved(). + * Removes all handlers from the pipeline one by one from tail (exclusive) to head (exclusive) to trigger + * handlerRemoved(). + * + * Note that we traverse up the pipeline ({@link #destroyUp(AbstractChannelHandlerContext)}) + * before traversing down ({@link #destroyDown(Thread, AbstractChannelHandlerContext)}) so that + * the handlers are removed after all events are handled. + * + * See: https://github.com/netty/netty/issues/3156 */ - private void teardownAll() { - tail.prev.teardown(); + private void destroy() { + destroyUp(head.next); + } + + private void destroyUp(AbstractChannelHandlerContext ctx) { + final Thread currentThread = Thread.currentThread(); + final AbstractChannelHandlerContext tail = this.tail; + for (;;) { + if (ctx == tail) { + destroyDown(currentThread, tail.prev); + break; + } + + final EventExecutor executor = ctx.executor(); + if (!executor.inEventLoop(currentThread)) { + final AbstractChannelHandlerContext finalCtx = ctx; + executor.execute(new OneTimeTask() { + @Override + public void run() { + destroyUp(finalCtx); + } + }); + break; + } + + ctx = ctx.next; + } + } + + private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx) { + // We have reached at tail; now traverse backwards. + final AbstractChannelHandlerContext head = this.head; + for (;;) { + if (ctx == head) { + break; + } + + final EventExecutor executor = ctx.executor(); + if (executor.inEventLoop(currentThread)) { + synchronized (this) { + remove0(ctx); + } + } else { + final AbstractChannelHandlerContext finalCtx = ctx; + executor.execute(new OneTimeTask() { + @Override + public void run() { + destroyDown(Thread.currentThread(), finalCtx); + } + }); + break; + } + + ctx = ctx.prev; + } } @Override