From 8e72071d7648df705fc2372dc63b4840d33846f1 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 30 Jan 2019 13:34:20 +0100 Subject: [PATCH] =?UTF-8?q?Remove=20ability=20to=20specify=20a=20custom=20?= =?UTF-8?q?EventExecutor=20when=20adding=20handlers=E2=80=A6=20(#8778)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Motiviation: In the past we allowed to use different EventExecutors for different ChannelHandlers in the ChannelPipeline. This introduced a lot of complexity while not providing much gain. Also it made the pipeline racy in terms of adding / remove handlers in some situations. This feature is not really used in the wild and can be easily archived by offloading heavy logic to an Executor by the user itself. Modifications: - Remove the ability to provide custom EventExecutor when adding handlers to the pipeline. - Remove testcode that is not needed any more - Ensure a handler is correctly visible in the pipeline when asked for it by the user while not be used until the EventLoop runs. This ensures correct ordering and visibility. - Correctly remove ChannelHandlers from pipeline when scheduling of handlerAdded(...) callbacks fail. Result: Remove races in DefaultChannelPipeline and simplify implementation of AbstractChannelHandlerContext. --- .../transport/socket/SocketEchoTest.java | 85 +- .../transport/socket/SocketStartTlsTest.java | 20 +- .../io/netty/channel/AbstractChannel.java | 11 +- .../AbstractChannelHandlerContext.java | 637 ++++++-------- .../io/netty/channel/ChannelPipeline.java | 112 +-- .../channel/DefaultChannelHandlerContext.java | 9 +- .../netty/channel/DefaultChannelPipeline.java | 824 +++++++++--------- .../netty/channel/AbstractEventLoopTest.java | 45 - .../netty/channel/ChannelInitializerTest.java | 193 ---- .../channel/ChannelOutboundBufferTest.java | 87 -- .../channel/DefaultChannelPipelineTest.java | 253 +----- .../local/LocalTransportThreadModelTest.java | 597 ------------- .../local/LocalTransportThreadModelTest3.java | 326 ------- 13 files changed, 755 insertions(+), 2444 deletions(-) delete mode 100644 transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java delete mode 100644 transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketEchoTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketEchoTest.java index 4c4ee73faf..1781723d86 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketEchoTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketEchoTest.java @@ -22,13 +22,8 @@ import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.util.concurrent.DefaultEventExecutorGroup; -import io.netty.util.concurrent.EventExecutorGroup; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; @@ -42,29 +37,17 @@ public class SocketEchoTest extends AbstractSocketTest { private static final Random random = new Random(); static final byte[] data = new byte[1048576]; - private static EventExecutorGroup group; - static { random.nextBytes(data); } - @BeforeClass - public static void createGroup() { - group = new DefaultEventExecutorGroup(2); - } - - @AfterClass - public static void destroyGroup() throws Exception { - group.shutdownGracefully().sync(); - } - @Test(timeout = 30000) public void testSimpleEcho() throws Throwable { run(); } public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable { - testSimpleEcho0(sb, cb, false, false, true); + testSimpleEcho0(sb, cb, false, true); } @Test(timeout = 30000) @@ -73,25 +56,7 @@ public class SocketEchoTest extends AbstractSocketTest { } public void testSimpleEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable { - testSimpleEcho0(sb, cb, false, false, false); - } - - @Test//(timeout = 30000) - public void testSimpleEchoWithAdditionalExecutor() throws Throwable { - run(); - } - - public void testSimpleEchoWithAdditionalExecutor(ServerBootstrap sb, Bootstrap cb) throws Throwable { - testSimpleEcho0(sb, cb, true, false, true); - } - - @Test//(timeout = 30000) - public void testSimpleEchoWithAdditionalExecutorNotAutoRead() throws Throwable { - run(); - } - - public void testSimpleEchoWithAdditionalExecutorNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable { - testSimpleEcho0(sb, cb, true, false, false); + testSimpleEcho0(sb, cb, false, false); } @Test//(timeout = 30000) @@ -100,7 +65,7 @@ public class SocketEchoTest extends AbstractSocketTest { } public void testSimpleEchoWithVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable { - testSimpleEcho0(sb, cb, false, true, true); + testSimpleEcho0(sb, cb, true, true); } @Test//(timeout = 30000) @@ -109,48 +74,24 @@ public class SocketEchoTest extends AbstractSocketTest { } public void testSimpleEchoWithVoidPromiseNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable { - testSimpleEcho0(sb, cb, false, true, false); - } - - @Test(timeout = 30000) - public void testSimpleEchoWithAdditionalExecutorAndVoidPromise() throws Throwable { - run(); - } - - public void testSimpleEchoWithAdditionalExecutorAndVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable { - testSimpleEcho0(sb, cb, true, true, true); + testSimpleEcho0(sb, cb, true, false); } private static void testSimpleEcho0( - ServerBootstrap sb, Bootstrap cb, boolean additionalExecutor, boolean voidPromise, boolean autoRead) + ServerBootstrap sb, Bootstrap cb, boolean voidPromise, boolean autoRead) throws Throwable { final EchoHandler sh = new EchoHandler(autoRead); final EchoHandler ch = new EchoHandler(autoRead); - if (additionalExecutor) { - sb.childHandler(new ChannelInitializer() { - @Override - protected void initChannel(Channel c) throws Exception { - c.pipeline().addLast(group.next(), sh); - } - }); - cb.handler(new ChannelInitializer() { - @Override - protected void initChannel(Channel c) throws Exception { - c.pipeline().addLast(group.next(), ch); - } - }); - } else { - sb.childHandler(sh); - sb.handler(new ChannelInboundHandlerAdapter() { - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - cause.printStackTrace(); - } - }); - cb.handler(ch); - } + sb.childHandler(sh); + sb.handler(new ChannelInboundHandlerAdapter() { + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + } + }); + cb.handler(ch); sb.childOption(ChannelOption.AUTO_READ, autoRead); cb.option(ChannelOption.AUTO_READ, autoRead); diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java index 973e92be1d..73e1e6f7c6 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java @@ -35,13 +35,9 @@ import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslProvider; import io.netty.handler.ssl.util.SelfSignedCertificate; -import io.netty.util.concurrent.DefaultEventExecutorGroup; -import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Future; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -66,7 +62,6 @@ public class SocketStartTlsTest extends AbstractSocketTest { private static final LogLevel LOG_LEVEL = LogLevel.TRACE; private static final File CERT_FILE; private static final File KEY_FILE; - private static EventExecutorGroup executor; static { SelfSignedCertificate ssc; @@ -106,16 +101,6 @@ public class SocketStartTlsTest extends AbstractSocketTest { return params; } - @BeforeClass - public static void createExecutor() { - executor = new DefaultEventExecutorGroup(2); - } - - @AfterClass - public static void shutdownExecutor() throws Exception { - executor.shutdownGracefully().sync(); - } - private final SslContext serverCtx; private final SslContext clientCtx; @@ -146,7 +131,6 @@ public class SocketStartTlsTest extends AbstractSocketTest { sb.childOption(ChannelOption.AUTO_READ, autoRead); cb.option(ChannelOption.AUTO_READ, autoRead); - final EventExecutorGroup executor = SocketStartTlsTest.executor; SSLEngine sse = serverCtx.newEngine(PooledByteBufAllocator.DEFAULT); SSLEngine cse = clientCtx.newEngine(PooledByteBufAllocator.DEFAULT); @@ -159,7 +143,7 @@ public class SocketStartTlsTest extends AbstractSocketTest { ChannelPipeline p = sch.pipeline(); p.addLast("logger", new LoggingHandler(LOG_LEVEL)); p.addLast(new LineBasedFrameDecoder(64), new StringDecoder(), new StringEncoder()); - p.addLast(executor.next(), sh); + p.addLast(sh); } }); @@ -169,7 +153,7 @@ public class SocketStartTlsTest extends AbstractSocketTest { ChannelPipeline p = sch.pipeline(); p.addLast("logger", new LoggingHandler(LOG_LEVEL)); p.addLast(new LineBasedFrameDecoder(64), new StringDecoder(), new StringEncoder()); - p.addLast(executor.next(), ch); + p.addLast(ch); } }); diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index cd339533c0..f0facca702 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -60,6 +60,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private final ChannelId id; private final Unsafe unsafe; private final ChannelPipeline pipeline; + private final ChannelFuture succeedFuture; private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false); private final CloseFuture closeFuture = new CloseFuture(this); @@ -83,6 +84,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha this.parent = parent; this.eventLoop = validateEventLoop(eventLoop); id = newId(); + succeedFuture = new SucceededChannelFuture(this, eventLoop); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } @@ -97,6 +99,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha this.parent = parent; this.eventLoop = validateEventLoop(eventLoop); this.id = id; + succeedFuture = new SucceededChannelFuture(this, eventLoop); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } @@ -325,22 +328,22 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha @Override public ChannelPromise newPromise() { - return pipeline.newPromise(); + return new DefaultChannelPromise(this, eventLoop); } @Override public ChannelProgressivePromise newProgressivePromise() { - return pipeline.newProgressivePromise(); + return new DefaultChannelProgressivePromise(this, eventLoop); } @Override public ChannelFuture newSucceededFuture() { - return pipeline.newSucceededFuture(); + return succeedFuture; } @Override public ChannelFuture newFailedFuture(Throwable cause) { - return pipeline.newFailedFuture(cause); + return new FailedChannelFuture(this, eventLoop, cause); } @Override diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java index e557f2bdca..fad6f9205e 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java @@ -24,7 +24,6 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.ResourceLeakHint; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.FastThreadLocal; -import io.netty.util.concurrent.OrderedEventExecutor; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PromiseNotificationUtil; import io.netty.util.internal.ThrowableUtil; @@ -45,24 +44,20 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class); - volatile AbstractChannelHandlerContext next; - volatile AbstractChannelHandlerContext prev; + AbstractChannelHandlerContext next; + AbstractChannelHandlerContext prev; private static final AtomicIntegerFieldUpdater HANDLER_STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState"); - - /** - * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called. - */ - private static final int ADD_PENDING = 1; /** * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called. */ - private static final int ADD_COMPLETE = 2; + private static final int ADD_COMPLETE = 1; + /** * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called. */ - private static final int REMOVE_COMPLETE = 3; + private static final int REMOVE_COMPLETE = 2; /** * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called. @@ -107,11 +102,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap private final DefaultChannelPipeline pipeline; private final String name; - private final boolean ordered; - // Will be set to null if no child executor should be used, otherwise it will be set to the - // child executor. - final EventExecutor executor; private ChannelFuture succeededFuture; // Lazily instantiated tasks used to trigger events to a handler with different executor. @@ -120,14 +111,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap private volatile int handlerState = INIT; - AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, + AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, String name, Class handlerClass) { this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; - this.executor = executor; this.executionMask = mask(handlerClass); - // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor. - ordered = executor == null || executor instanceof OrderedEventExecutor; } /** @@ -248,15 +236,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return channel().config().getAllocator(); } - @Override - public EventExecutor executor() { - if (executor == null) { - return channel().eventLoop(); - } else { - return executor; - } - } - @Override public String name() { return name; @@ -264,126 +243,105 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap @Override public ChannelHandlerContext fireChannelRegistered() { - invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED)); + EventExecutor executor = executor(); + if (executor.inEventLoop()) { + findAndInvokeChannelRegistered(); + } else { + executor.execute(this::findAndInvokeChannelRegistered); + } return this; } - static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeChannelRegistered(); - } else { - executor.execute(next::invokeChannelRegistered); - } + private void findAndInvokeChannelRegistered() { + findContextInbound(MASK_CHANNEL_REGISTERED).invokeChannelRegistered(); } - private void invokeChannelRegistered() { - if (invokeHandler()) { - try { - ((ChannelInboundHandler) handler()).channelRegistered(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } else { - fireChannelRegistered(); + final void invokeChannelRegistered() { + try { + ((ChannelInboundHandler) handler()).channelRegistered(this); + } catch (Throwable t) { + notifyHandlerException(t); } } @Override public ChannelHandlerContext fireChannelUnregistered() { - invokeChannelUnregistered(findContextInbound(MASK_CHANNEL_UNREGISTERED)); + EventExecutor executor = executor(); + if (executor.inEventLoop()) { + findAndInvokeChannelUnregistered(); + } else { + executor.execute(this::findAndInvokeChannelUnregistered); + } return this; } - static void invokeChannelUnregistered(final AbstractChannelHandlerContext next) { - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeChannelUnregistered(); - } else { - executor.execute(next::invokeChannelUnregistered); - } + private void findAndInvokeChannelUnregistered() { + findContextInbound(MASK_CHANNEL_UNREGISTERED).invokeChannelUnregistered(); } - private void invokeChannelUnregistered() { - if (invokeHandler()) { - try { - ((ChannelInboundHandler) handler()).channelUnregistered(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } else { - fireChannelUnregistered(); + final void invokeChannelUnregistered() { + try { + ((ChannelInboundHandler) handler()).channelUnregistered(this); + } catch (Throwable t) { + notifyHandlerException(t); } } @Override public ChannelHandlerContext fireChannelActive() { - invokeChannelActive(findContextInbound(MASK_CHANNEL_ACTIVE)); + EventExecutor executor = executor(); + if (executor.inEventLoop()) { + findAndInvokeChannelActive(); + } else { + executor.execute(this::findAndInvokeChannelActive); + } return this; } - static void invokeChannelActive(final AbstractChannelHandlerContext next) { - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeChannelActive(); - } else { - executor.execute(next::invokeChannelActive); - } + private void findAndInvokeChannelActive() { + findContextInbound(MASK_CHANNEL_ACTIVE).invokeChannelActive(); } - private void invokeChannelActive() { - if (invokeHandler()) { - try { - ((ChannelInboundHandler) handler()).channelActive(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } else { - fireChannelActive(); + final void invokeChannelActive() { + try { + ((ChannelInboundHandler) handler()).channelActive(this); + } catch (Throwable t) { + notifyHandlerException(t); } } @Override public ChannelHandlerContext fireChannelInactive() { - invokeChannelInactive(findContextInbound(MASK_CHANNEL_INACTIVE)); + EventExecutor executor = executor(); + if (executor.inEventLoop()) { + findAndInvokeChannelInactive(); + } else { + executor.execute(this::findAndInvokeChannelInactive); + } return this; } - static void invokeChannelInactive(final AbstractChannelHandlerContext next) { - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeChannelInactive(); - } else { - executor.execute(next::invokeChannelInactive); - } + private void findAndInvokeChannelInactive() { + findContextInbound(MASK_CHANNEL_INACTIVE).invokeChannelInactive(); } - private void invokeChannelInactive() { - if (invokeHandler()) { - try { - ((ChannelInboundHandler) handler()).channelInactive(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } else { - fireChannelInactive(); + final void invokeChannelInactive() { + try { + ((ChannelInboundHandler) handler()).channelInactive(this); + } catch (Throwable t) { + notifyHandlerException(t); } } @Override - public ChannelHandlerContext fireExceptionCaught(final Throwable cause) { - invokeExceptionCaught(findContextInbound(MASK_EXCEPTION_CAUGHT), cause); - return this; - } - - static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) { + public ChannelHandlerContext fireExceptionCaught(Throwable cause) { ObjectUtil.checkNotNull(cause, "cause"); - EventExecutor executor = next.executor(); + EventExecutor executor = executor(); if (executor.inEventLoop()) { - next.invokeExceptionCaught(cause); + findAndInvokeExceptionCaught(cause); } else { try { - executor.execute(() -> next.invokeExceptionCaught(cause)); + executor.execute(() -> findAndInvokeExceptionCaught(cause)); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to submit an exceptionCaught() event.", t); @@ -391,146 +349,137 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } } } + return this; } - private void invokeExceptionCaught(final Throwable cause) { - if (invokeHandler()) { - try { - handler().exceptionCaught(this, cause); - } catch (Throwable error) { - if (logger.isDebugEnabled()) { - logger.debug( + private void findAndInvokeExceptionCaught(Throwable cause) { + findContextInbound(MASK_EXCEPTION_CAUGHT).invokeExceptionCaught(cause); + } + + final void invokeExceptionCaught(final Throwable cause) { + try { + handler().exceptionCaught(this, cause); + } catch (Throwable error) { + if (logger.isDebugEnabled()) { + logger.debug( "An exception {}" + - "was thrown by a user handler's exceptionCaught() " + - "method while handling the following exception:", + "was thrown by a user handler's exceptionCaught() " + + "method while handling the following exception:", ThrowableUtil.stackTraceToString(error), cause); - } else if (logger.isWarnEnabled()) { - logger.warn( + } else if (logger.isWarnEnabled()) { + logger.warn( "An exception '{}' [enable DEBUG level for full stacktrace] " + - "was thrown by a user handler's exceptionCaught() " + - "method while handling the following exception:", error, cause); - } + "was thrown by a user handler's exceptionCaught() " + + "method while handling the following exception:", error, cause); } - } else { - fireExceptionCaught(cause); } } @Override - public ChannelHandlerContext fireUserEventTriggered(final Object event) { - invokeUserEventTriggered(findContextInbound(MASK_USER_EVENT_TRIGGERED), event); + public ChannelHandlerContext fireUserEventTriggered(Object event) { + ObjectUtil.checkNotNull(event, "event"); + EventExecutor executor = executor(); + if (executor.inEventLoop()) { + findAndInvokeUserEventTriggered(event); + } else { + executor.execute(() -> findAndInvokeUserEventTriggered(event)); + } return this; } - static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) { - ObjectUtil.checkNotNull(event, "event"); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeUserEventTriggered(event); - } else { - executor.execute(() -> next.invokeUserEventTriggered(event)); - } + private void findAndInvokeUserEventTriggered(Object event) { + findContextInbound(MASK_USER_EVENT_TRIGGERED).invokeUserEventTriggered(event); } - private void invokeUserEventTriggered(Object event) { - if (invokeHandler()) { - try { - ((ChannelInboundHandler) handler()).userEventTriggered(this, event); - } catch (Throwable t) { - notifyHandlerException(t); - } - } else { - fireUserEventTriggered(event); + final void invokeUserEventTriggered(Object event) { + try { + ((ChannelInboundHandler) handler()).userEventTriggered(this, event); + } catch (Throwable t) { + notifyHandlerException(t); } } @Override public ChannelHandlerContext fireChannelRead(final Object msg) { - invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg); + ObjectUtil.checkNotNull(msg, "msg"); + EventExecutor executor = executor(); + if (executor.inEventLoop()) { + findAndInvokeChannelRead(msg); + } else { + try { + executor.execute(() -> findAndInvokeChannelRead(msg)); + } catch (Throwable cause) { + ReferenceCountUtil.release(msg); + throw cause; + } + } return this; } - static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { - final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeChannelRead(m); - } else { - executor.execute(() -> next.invokeChannelRead(m)); - } + private void findAndInvokeChannelRead(Object msg) { + findContextInbound(MASK_CHANNEL_READ).invokeChannelRead(msg); } - private void invokeChannelRead(Object msg) { - if (invokeHandler()) { - try { - ((ChannelInboundHandler) handler()).channelRead(this, msg); - } catch (Throwable t) { - notifyHandlerException(t); - } - } else { - fireChannelRead(msg); + final void invokeChannelRead(Object msg) { + final Object m = pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), this); + try { + ((ChannelInboundHandler) handler()).channelRead(this, m); + } catch (Throwable t) { + notifyHandlerException(t); } } @Override public ChannelHandlerContext fireChannelReadComplete() { - invokeChannelReadComplete(findContextInbound(MASK_CHANNEL_READ_COMPLETE)); - return this; - } - - static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) { - EventExecutor executor = next.executor(); + EventExecutor executor = executor(); if (executor.inEventLoop()) { - next.invokeChannelReadComplete(); + findAndInvokeChannelReadComplete(); } else { - Tasks tasks = next.invokeTasks; + Tasks tasks = invokeTasks; if (tasks == null) { - next.invokeTasks = tasks = new Tasks(next); + invokeTasks = tasks = new Tasks(this); } executor.execute(tasks.invokeChannelReadCompleteTask); } + return this; } - private void invokeChannelReadComplete() { - if (invokeHandler()) { - try { - ((ChannelInboundHandler) handler()).channelReadComplete(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } else { - fireChannelReadComplete(); + private void findAndInvokeChannelReadComplete() { + findContextInbound(MASK_CHANNEL_READ_COMPLETE).invokeChannelReadComplete(); + } + + final void invokeChannelReadComplete() { + try { + ((ChannelInboundHandler) handler()).channelReadComplete(this); + } catch (Throwable t) { + notifyHandlerException(t); } } @Override public ChannelHandlerContext fireChannelWritabilityChanged() { - invokeChannelWritabilityChanged(findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED)); - return this; - } - - static void invokeChannelWritabilityChanged(final AbstractChannelHandlerContext next) { - EventExecutor executor = next.executor(); + EventExecutor executor = executor(); if (executor.inEventLoop()) { - next.invokeChannelWritabilityChanged(); + findAndInvokeChannelWritabilityChanged(); } else { - Tasks tasks = next.invokeTasks; + Tasks tasks = invokeTasks; if (tasks == null) { - next.invokeTasks = tasks = new Tasks(next); + invokeTasks = tasks = new Tasks(this); } executor.execute(tasks.invokeChannelWritableStateChangedTask); } + return this; } - private void invokeChannelWritabilityChanged() { - if (invokeHandler()) { - try { - ((ChannelInboundHandler) handler()).channelWritabilityChanged(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } else { - fireChannelWritabilityChanged(); + private void findAndInvokeChannelWritabilityChanged() { + findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED).invokeChannelWritabilityChanged(); + } + + final void invokeChannelWritabilityChanged() { + try { + ((ChannelInboundHandler) handler()).channelWritabilityChanged(this); + } catch (Throwable t) { + notifyHandlerException(t); } } @@ -579,25 +528,24 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return promise; } - final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND); - EventExecutor executor = next.executor(); + EventExecutor executor = executor(); if (executor.inEventLoop()) { - next.invokeBind(localAddress, promise); + findAndInvokeBind(localAddress, promise); } else { - safeExecute(executor, () -> next.invokeBind(localAddress, promise), promise, null); + safeExecute(executor, () -> findAndInvokeBind(localAddress, promise), promise, null); } return promise; } + private void findAndInvokeBind(SocketAddress localAddress, ChannelPromise promise) { + findContextOutbound(MASK_BIND).invokeBind(localAddress, promise); + } + private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { - if (invokeHandler()) { - try { - ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } else { - bind(localAddress, promise); + try { + ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); } } @@ -618,25 +566,24 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return promise; } - final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT); - EventExecutor executor = next.executor(); + EventExecutor executor = executor(); if (executor.inEventLoop()) { - next.invokeConnect(remoteAddress, localAddress, promise); + findAndInvokeConnect(remoteAddress, localAddress, promise); } else { - safeExecute(executor, () -> next.invokeConnect(remoteAddress, localAddress, promise), promise, null); + safeExecute(executor, () -> findAndInvokeConnect(remoteAddress, localAddress, promise), promise, null); } return promise; } + private void findAndInvokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { + findContextOutbound(MASK_CONNECT).invokeConnect(remoteAddress, localAddress, promise); + } + private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { - if (invokeHandler()) { - try { - ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } else { - connect(remoteAddress, localAddress, promise); + try { + ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); } } @@ -647,37 +594,38 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return promise; } - final AbstractChannelHandlerContext next = findContextOutbound(MASK_DISCONNECT); - EventExecutor executor = next.executor(); + EventExecutor executor = executor(); if (executor.inEventLoop()) { // Translate disconnect to close if the channel has no notion of disconnect-reconnect. // So far, UDP/IP is the only transport that has such behavior. if (!channel().metadata().hasDisconnect()) { - next.invokeClose(promise); + findAndInvokeClose(promise); } else { - next.invokeDisconnect(promise); + findAndInvokeDisconnect(promise); } } else { safeExecute(executor, () -> { + // Translate disconnect to close if the channel has no notion of disconnect-reconnect. + // So far, UDP/IP is the only transport that has such behavior. if (!channel().metadata().hasDisconnect()) { - next.invokeClose(promise); + findAndInvokeClose(promise); } else { - next.invokeDisconnect(promise); + findAndInvokeDisconnect(promise); } }, promise, null); } return promise; } + private void findAndInvokeDisconnect(ChannelPromise promise) { + findContextOutbound(MASK_DISCONNECT).invokeDisconnect(promise); + } + private void invokeDisconnect(ChannelPromise promise) { - if (invokeHandler()) { - try { - ((ChannelOutboundHandler) handler()).disconnect(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } else { - disconnect(promise); + try { + ((ChannelOutboundHandler) handler()).disconnect(this, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); } } @@ -688,26 +636,24 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return promise; } - final AbstractChannelHandlerContext next = findContextOutbound(MASK_CLOSE); - EventExecutor executor = next.executor(); + EventExecutor executor = executor(); if (executor.inEventLoop()) { - next.invokeClose(promise); + findAndInvokeClose(promise); } else { - safeExecute(executor, () -> next.invokeClose(promise), promise, null); + safeExecute(executor, () -> findAndInvokeClose(promise), promise, null); } - return promise; } + private void findAndInvokeClose(ChannelPromise promise) { + findContextOutbound(MASK_CLOSE).invokeClose(promise); + } + private void invokeClose(ChannelPromise promise) { - if (invokeHandler()) { - try { - ((ChannelOutboundHandler) handler()).close(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } else { - close(promise); + try { + ((ChannelOutboundHandler) handler()).close(this, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); } } @@ -718,26 +664,24 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return promise; } - final AbstractChannelHandlerContext next = findContextOutbound(MASK_REGISTER); - EventExecutor executor = next.executor(); + EventExecutor executor = executor(); if (executor.inEventLoop()) { - next.invokeRegister(promise); + findAndInvokeRegister(promise); } else { - safeExecute(executor, () -> next.invokeRegister(promise), promise, null); + safeExecute(executor, () -> findAndInvokeRegister(promise), promise, null); } - return promise; } + private void findAndInvokeRegister(ChannelPromise promise) { + findContextOutbound(MASK_REGISTER).invokeRegister(promise); + } + private void invokeRegister(ChannelPromise promise) { - if (invokeHandler()) { - try { - ((ChannelOutboundHandler) handler()).register(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } else { - register(promise); + try { + ((ChannelOutboundHandler) handler()).register(this, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); } } @@ -748,55 +692,51 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return promise; } - final AbstractChannelHandlerContext next = findContextOutbound(MASK_DEREGISTER); - EventExecutor executor = next.executor(); + EventExecutor executor = executor(); if (executor.inEventLoop()) { - next.invokeDeregister(promise); + findAndInvokeDeregister(promise); } else { - safeExecute(executor, () -> next.invokeDeregister(promise), promise, null); + safeExecute(executor, () -> findAndInvokeDeregister(promise), promise, null); } - return promise; } + private void findAndInvokeDeregister(ChannelPromise promise) { + findContextOutbound(MASK_DEREGISTER).invokeDeregister(promise); + } + private void invokeDeregister(ChannelPromise promise) { - if (invokeHandler()) { - try { - ((ChannelOutboundHandler) handler()).deregister(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } else { - deregister(promise); + try { + ((ChannelOutboundHandler) handler()).deregister(this, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); } } @Override public ChannelHandlerContext read() { - final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ); - EventExecutor executor = next.executor(); + EventExecutor executor = executor(); if (executor.inEventLoop()) { - next.invokeRead(); + findAndInvokeRead(); } else { - Tasks tasks = next.invokeTasks; + Tasks tasks = invokeTasks; if (tasks == null) { - next.invokeTasks = tasks = new Tasks(next); + invokeTasks = tasks = new Tasks(this); } executor.execute(tasks.invokeReadTask); } - return this; } + private void findAndInvokeRead() { + findContextOutbound(MASK_READ).invokeRead(); + } + private void invokeRead() { - if (invokeHandler()) { - try { - ((ChannelOutboundHandler) handler()).read(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } else { - read(); + try { + ((ChannelOutboundHandler) handler()).read(this); + } catch (Throwable t) { + notifyHandlerException(t); } } @@ -813,16 +753,9 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } private void invokeWrite(Object msg, ChannelPromise promise) { - if (invokeHandler()) { - invokeWrite0(msg, promise); - } else { - write(msg, promise); - } - } - - private void invokeWrite0(Object msg, ChannelPromise promise) { + final Object m = pipeline.touch(msg, this); try { - ((ChannelOutboundHandler) handler()).write(this, msg, promise); + ((ChannelOutboundHandler) handler()).write(this, m, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } @@ -830,14 +763,13 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap @Override public ChannelHandlerContext flush() { - final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH); - EventExecutor executor = next.executor(); + EventExecutor executor = executor(); if (executor.inEventLoop()) { - next.invokeFlush(); + findAndInvokeFlush(); } else { - Tasks tasks = next.invokeTasks; + Tasks tasks = invokeTasks; if (tasks == null) { - next.invokeTasks = tasks = new Tasks(next); + invokeTasks = tasks = new Tasks(this); } safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null); } @@ -845,15 +777,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return this; } - private void invokeFlush() { - if (invokeHandler()) { - invokeFlush0(); - } else { - flush(); - } + private void findAndInvokeFlush() { + findContextOutbound(MASK_FLUSH).invokeFlush(); } - private void invokeFlush0() { + private void invokeFlush() { try { ((ChannelOutboundHandler) handler()).flush(this); } catch (Throwable t) { @@ -868,12 +796,8 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { - if (invokeHandler()) { - invokeWrite0(msg, promise); - invokeFlush0(); - } else { - writeAndFlush(msg, promise); - } + invokeWrite(msg, promise); + invokeFlush(); } private void write(Object msg, boolean flush, ChannelPromise promise) { @@ -889,23 +813,23 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap throw e; } - final AbstractChannelHandlerContext next = findContextOutbound(flush ? (MASK_WRITE | MASK_FLUSH) : MASK_WRITE); - final Object m = pipeline.touch(msg, next); - EventExecutor executor = next.executor(); + EventExecutor executor = executor(); if (executor.inEventLoop()) { + final AbstractChannelHandlerContext next = findContextOutbound(flush ? + (MASK_WRITE | MASK_FLUSH) : MASK_WRITE); if (flush) { - next.invokeWriteAndFlush(m, promise); + next.invokeWriteAndFlush(msg, promise); } else { - next.invokeWrite(m, promise); + next.invokeWrite(msg, promise); } } else { final AbstractWriteTask task; if (flush) { - task = WriteAndFlushTask.newInstance(next, m, promise); + task = WriteAndFlushTask.newInstance(this, msg, promise); } else { - task = WriteTask.newInstance(next, m, promise); + task = WriteTask.newInstance(this, msg, promise); } - if (!safeExecute(executor, task, promise, m)) { + if (!safeExecute(executor, task, promise, msg)) { // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes // and put it back in the Recycler for re-use later. // @@ -1041,28 +965,15 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return channel().voidPromise(); } - final void setRemoved() { + private void setRemoved() { handlerState = REMOVE_COMPLETE; } final boolean setAddComplete() { - for (;;) { - int oldState = handlerState; - if (oldState == REMOVE_COMPLETE) { - return false; - } - // Ensure we never update when the handlerState is REMOVE_COMPLETE already. - // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not - // exposing ordering guarantees. - if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) { - return true; - } - } - } - - final void setAddPending() { - boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING); - assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved(). + // Ensure we never update when the handlerState is REMOVE_COMPLETE already. + // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not + // exposing ordering guarantees. + return HANDLER_STATE_UPDATER.getAndSet(this, ADD_COMPLETE) != REMOVE_COMPLETE; } final void callHandlerAdded() throws Exception { @@ -1085,20 +996,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } } - /** - * Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called - * yet. If not return {@code false} and if called or could not detect return {@code true}. - * - * If this method returns {@code false} we will not invoke the {@link ChannelHandler} but just forward the event. - * This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list - * but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}. - */ - private boolean invokeHandler() { - // Store in local variable to reduce volatile reads. - int handlerState = this.handlerState; - return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING); - } - @Override public boolean isRemoved() { return handlerState == REMOVE_COMPLETE; @@ -1140,15 +1037,15 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')'; } + private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT = + SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true); + + // Assuming a 64-bit JVM, 16 bytes object header, 3 reference fields and one int field, plus alignment + private static final int WRITE_TASK_OVERHEAD = + SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 48); + abstract static class AbstractWriteTask implements Runnable { - private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT = - SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true); - - // Assuming a 64-bit JVM, 16 bytes object header, 3 reference fields and one int field, plus alignment - private static final int WRITE_TASK_OVERHEAD = - SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 48); - private final Recycler.Handle handle; private AbstractChannelHandlerContext ctx; private Object msg; @@ -1174,11 +1071,13 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } } + protected abstract AbstractChannelHandlerContext findContext(AbstractChannelHandlerContext ctx); @Override public final void run() { try { decrementPendingOutboundBytes(); - write(ctx, msg, promise); + AbstractChannelHandlerContext next = findContext(ctx); + write(next, msg, promise); } finally { recycle(); } @@ -1227,6 +1126,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return task; } + @Override + protected AbstractChannelHandlerContext findContext(AbstractChannelHandlerContext ctx) { + return ctx.findContextOutbound(MASK_WRITE); + } + private WriteTask(Recycler.Handle handle) { super(handle); } @@ -1252,6 +1156,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap super(handle); } + @Override + protected AbstractChannelHandlerContext findContext(AbstractChannelHandlerContext ctx) { + return ctx.findContextOutbound(MASK_WRITE | MASK_FLUSH); + } + @Override public void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) { super.write(ctx, msg, promise); @@ -1265,11 +1174,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap private final Runnable invokeChannelWritableStateChangedTask; private final Runnable invokeFlushTask; - Tasks(AbstractChannelHandlerContext next) { - invokeChannelReadCompleteTask = next::invokeChannelReadComplete; - invokeReadTask = next::invokeRead; - invokeChannelWritableStateChangedTask = next::invokeChannelWritabilityChanged; - invokeFlushTask = next::invokeFlush; + Tasks(AbstractChannelHandlerContext ctx) { + invokeChannelReadCompleteTask = ctx::findAndInvokeChannelReadComplete; + invokeReadTask = ctx::findAndInvokeRead; + invokeChannelWritableStateChangedTask = ctx::invokeChannelWritabilityChanged; + invokeFlushTask = ctx::findAndInvokeFlush; } } } diff --git a/transport/src/main/java/io/netty/channel/ChannelPipeline.java b/transport/src/main/java/io/netty/channel/ChannelPipeline.java index 8a8571af15..386c0c76ed 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/ChannelPipeline.java @@ -16,22 +16,19 @@ package io.netty.channel; import io.netty.buffer.ByteBuf; -import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutor; -import io.netty.util.concurrent.EventExecutorGroup; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.NoSuchElementException; /** * A list of {@link ChannelHandler}s which handles or intercepts inbound events and outbound operations of a - * {@link Channel}. {@link ChannelPipeline} implements an advanced form of the + * {@link Channel}. {@link ChannelPipeline} implements an advanced form of the * Intercepting Filter pattern * to give a user full control over how an event is handled and how the {@link ChannelHandler}s in a pipeline * interact with each other. @@ -179,7 +176,7 @@ import java.util.NoSuchElementException; *

Building a pipeline

*

* A user is supposed to have one or more {@link ChannelHandler}s in a pipeline to receive I/O events (e.g. read) and - * to request I/O operations (e.g. write and close). For example, a typical server will have the following handlers + * to request I/O operations (e.g. write and close). For example, a typical server will have the following handlers * in each channel's pipeline, but your mileage may vary depending on the complexity and characteristics of the * protocol and business logic: * @@ -192,7 +189,6 @@ import java.util.NoSuchElementException; * and it could be represented as shown in the following example: * *

- * static final {@link EventExecutorGroup} group = new {@link DefaultEventExecutorGroup}(16);
  * ...
  *
  * {@link ChannelPipeline} pipeline = ch.pipeline();
@@ -200,12 +196,9 @@ import java.util.NoSuchElementException;
  * pipeline.addLast("decoder", new MyProtocolDecoder());
  * pipeline.addLast("encoder", new MyProtocolEncoder());
  *
- * // Tell the pipeline to run MyBusinessLogicHandler's event handler methods
- * // in a different thread than an I/O thread so that the I/O thread is not blocked by
- * // a time-consuming task.
- * // If your business logic is fully asynchronous or finished very quickly, you don't
- * // need to specify a group.
- * pipeline.addLast(group.next(), "handler", new MyBusinessLogicHandler());
+ * // If your business logic does block or take a lot of time you should offload the work to an extra
+ * // {@link java.util.concurrent.Executor} to ensure you don't block the {@link EventLoop}.
+ * pipeline.addLast("handler",  new MyBusinessLogicHandler());
  * 
* *

Thread safety

@@ -215,7 +208,7 @@ import java.util.NoSuchElementException; * after the exchange. */ public interface ChannelPipeline - extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable> { + extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable> { /** * Inserts a {@link ChannelHandler} at the first position of this pipeline. @@ -230,21 +223,6 @@ public interface ChannelPipeline */ ChannelPipeline addFirst(String name, ChannelHandler handler); - /** - * Inserts a {@link ChannelHandler} at the first position of this pipeline. - * - * @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler} - * methods - * @param name the name of the handler to insert first - * @param handler the handler to insert first - * - * @throws IllegalArgumentException - * if there's an entry with the same name already in the pipeline - * @throws NullPointerException - * if the specified handler is {@code null} - */ - ChannelPipeline addFirst(EventExecutor executor, String name, ChannelHandler handler); - /** * Appends a {@link ChannelHandler} at the last position of this pipeline. * @@ -258,21 +236,6 @@ public interface ChannelPipeline */ ChannelPipeline addLast(String name, ChannelHandler handler); - /** - * Appends a {@link ChannelHandler} at the last position of this pipeline. - * - * @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler} - * methods - * @param name the name of the handler to append - * @param handler the handler to append - * - * @throws IllegalArgumentException - * if there's an entry with the same name already in the pipeline - * @throws NullPointerException - * if the specified handler is {@code null} - */ - ChannelPipeline addLast(EventExecutor executor, String name, ChannelHandler handler); - /** * Inserts a {@link ChannelHandler} before an existing handler of this * pipeline. @@ -290,25 +253,6 @@ public interface ChannelPipeline */ ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler); - /** - * Inserts a {@link ChannelHandler} before an existing handler of this - * pipeline. - * - * @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler} - * methods - * @param baseName the name of the existing handler - * @param name the name of the handler to insert before - * @param handler the handler to insert before - * - * @throws NoSuchElementException - * if there's no such entry with the specified {@code baseName} - * @throws IllegalArgumentException - * if there's an entry with the same name already in the pipeline - * @throws NullPointerException - * if the specified baseName or handler is {@code null} - */ - ChannelPipeline addBefore(EventExecutor executor, String baseName, String name, ChannelHandler handler); - /** * Inserts a {@link ChannelHandler} after an existing handler of this * pipeline. @@ -326,25 +270,6 @@ public interface ChannelPipeline */ ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler); - /** - * Inserts a {@link ChannelHandler} after an existing handler of this - * pipeline. - * - * @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler} - * methods - * @param baseName the name of the existing handler - * @param name the name of the handler to insert after - * @param handler the handler to insert after - * - * @throws NoSuchElementException - * if there's no such entry with the specified {@code baseName} - * @throws IllegalArgumentException - * if there's an entry with the same name already in the pipeline - * @throws NullPointerException - * if the specified baseName or handler is {@code null} - */ - ChannelPipeline addAfter(EventExecutor executor, String baseName, String name, ChannelHandler handler); - /** * Inserts {@link ChannelHandler}s at the first position of this pipeline. * @@ -353,16 +278,6 @@ public interface ChannelPipeline */ ChannelPipeline addFirst(ChannelHandler... handlers); - /** - * Inserts {@link ChannelHandler}s at the first position of this pipeline. - * - * @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler}s - * methods. - * @param handlers the handlers to insert first - * - */ - ChannelPipeline addFirst(EventExecutor executor, ChannelHandler... handlers); - /** * Inserts {@link ChannelHandler}s at the last position of this pipeline. * @@ -371,16 +286,6 @@ public interface ChannelPipeline */ ChannelPipeline addLast(ChannelHandler... handlers); - /** - * Inserts {@link ChannelHandler}s at the last position of this pipeline. - * - * @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler}s - * methods. - * @param handlers the handlers to insert last - * - */ - ChannelPipeline addLast(EventExecutor executor, ChannelHandler... handlers); - /** * Removes the specified {@link ChannelHandler} from this pipeline. * @@ -624,4 +529,9 @@ public interface ChannelPipeline @Override ChannelPipeline flush(); + + /** + * Returns the {@link EventExecutor} which is used by all {@link ChannelHandler}s in the pipeline. + */ + EventExecutor executor(); } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index 3a3fe3412b..14c41b7acb 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -23,8 +23,8 @@ final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext { private final ChannelHandler handler; DefaultChannelHandlerContext( - DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { - super(pipeline, executor, name, ObjectUtil.checkNotNull(handler, "handler").getClass()); + DefaultChannelPipeline pipeline, String name, ChannelHandler handler) { + super(pipeline, name, ObjectUtil.checkNotNull(handler, "handler").getClass()); this.handler = handler; } @@ -32,4 +32,9 @@ final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext { public ChannelHandler handler() { return handler; } + + @Override + public EventExecutor executor() { + return pipeline().executor(); + } } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 2e9c101ea4..f1daeaca06 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -28,6 +28,7 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import java.net.SocketAddress; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -35,6 +36,8 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.WeakHashMap; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.IntSupplier; +import java.util.function.Predicate; /** * The default {@link ChannelPipeline} implementation. It is usually created @@ -65,12 +68,13 @@ public class DefaultChannelPipeline implements ChannelPipeline { private final ChannelFuture succeededFuture; private final VoidChannelPromise voidPromise; private final boolean touch = ResourceLeakDetector.isEnabled(); + private final List handlers = new ArrayList<>(4); private volatile MessageSizeEstimator.Handle estimatorHandle; - protected DefaultChannelPipeline(Channel channel) { + public DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); - succeededFuture = new SucceededChannelFuture(channel, null); + succeededFuture = new SucceededChannelFuture(channel, channel.eventLoop()); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); @@ -95,8 +99,8 @@ public class DefaultChannelPipeline implements ChannelPipeline { return touch ? ReferenceCountUtil.touch(msg, next) : msg; } - private AbstractChannelHandlerContext newContext(EventExecutor executor, String name, ChannelHandler handler) { - return new DefaultChannelHandlerContext(this, executor, name, handler); + private AbstractChannelHandlerContext newContext(String name, ChannelHandler handler) { + return new DefaultChannelHandlerContext(this, name, handler); } @Override @@ -105,29 +109,37 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public final ChannelPipeline addFirst(String name, ChannelHandler handler) { - return addFirst(null, name, handler); + public final EventExecutor executor() { + return channel().eventLoop(); } @Override - public final ChannelPipeline addFirst(EventExecutor executor, String name, ChannelHandler handler) { - final AbstractChannelHandlerContext newCtx; - synchronized (this) { - checkMultiplicity(handler); - name = filterName(name, handler); + public final ChannelPipeline addFirst(String name, ChannelHandler handler) { + checkMultiplicity(handler); + if (name == null) { + name = generateName(handler); + } - newCtx = newContext(executor, name, handler); - - addFirst0(newCtx); - - EventExecutor ctxExecutor = newCtx.executor(); - if (!ctxExecutor.inEventLoop()) { - newCtx.setAddPending(); - ctxExecutor.execute(() -> callHandlerAdded0(newCtx)); - return this; + AbstractChannelHandlerContext newCtx = newContext(name, handler); + EventExecutor executor = executor(); + boolean inEventLoop = executor.inEventLoop(); + synchronized (handlers) { + if (context(name) != null) { + throw new IllegalArgumentException("Duplicate handler name: " + name); + } + handlers.add(0, newCtx); + if (!inEventLoop) { + try { + executor.execute(() -> addFirst0(newCtx)); + return this; + } catch (Throwable cause) { + handlers.remove(0); + throw cause; + } } } - callHandlerAdded0(newCtx); + + addFirst0(newCtx); return this; } @@ -137,31 +149,36 @@ public class DefaultChannelPipeline implements ChannelPipeline { newCtx.next = nextCtx; head.next = newCtx; nextCtx.prev = newCtx; + callHandlerAdded0(newCtx); } @Override public final ChannelPipeline addLast(String name, ChannelHandler handler) { - return addLast(null, name, handler); - } + checkMultiplicity(handler); + if (name == null) { + name = generateName(handler); + } - @Override - public final ChannelPipeline addLast(EventExecutor executor, String name, ChannelHandler handler) { - final AbstractChannelHandlerContext newCtx; + AbstractChannelHandlerContext newCtx = newContext(name, handler); + EventExecutor executor = executor(); + boolean inEventLoop = executor.inEventLoop(); synchronized (this) { - checkMultiplicity(handler); - - newCtx = newContext(executor, filterName(name, handler), handler); - - addLast0(newCtx); - - EventExecutor ctxExecutor = newCtx.executor(); - if (!ctxExecutor.inEventLoop()) { - newCtx.setAddPending(); - ctxExecutor.execute(() -> callHandlerAdded0(newCtx)); - return this; + if (context(name) != null) { + throw new IllegalArgumentException("Duplicate handler name: " + name); + } + handlers.add(newCtx); + if (!inEventLoop) { + try { + executor.execute(() -> addLast0(newCtx)); + return this; + } catch (Throwable cause) { + handlers.remove(handlers.size() - 1); + throw cause; + } } } - callHandlerAdded0(newCtx); + + addLast0(newCtx); return this; } @@ -171,89 +188,101 @@ public class DefaultChannelPipeline implements ChannelPipeline { newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; + callHandlerAdded0(newCtx); } @Override public final ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) { - return addBefore(null, baseName, name, handler); - } - - @Override - public final ChannelPipeline addBefore( - EventExecutor executor, String baseName, String name, ChannelHandler handler) { - final AbstractChannelHandlerContext newCtx; final AbstractChannelHandlerContext ctx; - synchronized (this) { - checkMultiplicity(handler); - name = filterName(name, handler); - ctx = getContextOrDie(baseName); - newCtx = newContext(executor, name, handler); + checkMultiplicity(handler); + if (name == null) { + name = generateName(handler); + } - addBefore0(ctx, newCtx); + AbstractChannelHandlerContext newCtx = newContext(name, handler); + EventExecutor executor = executor(); + boolean inEventLoop = executor.inEventLoop(); + synchronized (handlers) { + int i = findCtxIdx(context -> context.name().equals(baseName)); - EventExecutor ctxExecutor = newCtx.executor(); - if (!ctxExecutor.inEventLoop()) { - newCtx.setAddPending(); - ctxExecutor.execute(() -> callHandlerAdded0(newCtx)); - return this; + if (i == -1) { + throw new NoSuchElementException(baseName); + } + + if (context(name) != null) { + throw new IllegalArgumentException("Duplicate handler name: " + name); + } + ctx = handlers.get(i); + handlers.add(i, newCtx); + if (!inEventLoop) { + try { + executor.execute(() -> addBefore0(ctx, newCtx)); + return this; + } catch (Throwable cause) { + handlers.remove(i); + throw cause; + } } } - callHandlerAdded0(newCtx); + + addBefore0(ctx, newCtx); return this; } - private static void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) { + private void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) { newCtx.prev = ctx.prev; newCtx.next = ctx; ctx.prev.next = newCtx; ctx.prev = newCtx; - } - - private String filterName(String name, ChannelHandler handler) { - if (name == null) { - return generateName(handler); - } - checkDuplicateName(name); - return name; + callHandlerAdded0(newCtx); } @Override public final ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) { - return addAfter(null, baseName, name, handler); - } - - @Override - public final ChannelPipeline addAfter( - EventExecutor executor, String baseName, String name, ChannelHandler handler) { - final AbstractChannelHandlerContext newCtx; final AbstractChannelHandlerContext ctx; - synchronized (this) { - checkMultiplicity(handler); - name = filterName(name, handler); - ctx = getContextOrDie(baseName); + checkMultiplicity(handler); + if (name == null) { + name = generateName(handler); + } - newCtx = newContext(executor, name, handler); + AbstractChannelHandlerContext newCtx = newContext(name, handler); + EventExecutor executor = executor(); + boolean inEventLoop = executor.inEventLoop(); + synchronized (handlers) { + int i = findCtxIdx(context -> context.name().equals(baseName)); - addAfter0(ctx, newCtx); + if (i == -1) { + throw new NoSuchElementException(baseName); + } - EventExecutor ctxExecutor = newCtx.executor(); - if (!ctxExecutor.inEventLoop()) { - newCtx.setAddPending(); - ctxExecutor.execute(() -> callHandlerAdded0(newCtx)); - return this; + if (context(name) != null) { + throw new IllegalArgumentException("Duplicate handler name: " + name); + } + ctx = handlers.get(i); + handlers.add(i + 1, newCtx); + if (!inEventLoop) { + try { + executor.execute(() -> addAfter0(ctx, newCtx)); + return this; + } catch (Throwable cause) { + handlers.remove(i + 1); + throw cause; + } } } - callHandlerAdded0(newCtx); + + addAfter0(ctx, newCtx); return this; } - private static void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) { + private void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) { newCtx.prev = ctx; newCtx.next = ctx.next; ctx.next.prev = newCtx; ctx.next = newCtx; + callHandlerAdded0(newCtx); } public final ChannelPipeline addFirst(ChannelHandler handler) { @@ -262,11 +291,6 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public final ChannelPipeline addFirst(ChannelHandler... handlers) { - return addFirst(null, handlers); - } - - @Override - public final ChannelPipeline addFirst(EventExecutor executor, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } @@ -283,7 +307,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { for (int i = size - 1; i >= 0; i --) { ChannelHandler h = handlers[i]; - addFirst(executor, null, h); + addFirst(null, h); } return this; @@ -295,11 +319,6 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public final ChannelPipeline addLast(ChannelHandler... handlers) { - return addLast(null, handlers); - } - - @Override - public final ChannelPipeline addLast(EventExecutor executor, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } @@ -308,7 +327,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { if (h == null) { break; } - addLast(executor, null, h); + addLast(null, h); } return this; @@ -323,18 +342,21 @@ public class DefaultChannelPipeline implements ChannelPipeline { cache.put(handlerType, name); } - // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid - // any name conflicts. Note that we don't cache the names generated here. - if (context0(name) != null) { - String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'. - for (int i = 1;; i ++) { - String newName = baseName + i; - if (context0(newName) == null) { - name = newName; - break; + synchronized (handlers) { + // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid + // any name conflicts. Note that we don't cache the names generated here. + if (context(name) != null) { + String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'. + for (int i = 1;; i ++) { + String newName = baseName + i; + if (context(newName) == null) { + name = newName; + break; + } } } } + return name; } @@ -342,141 +364,214 @@ public class DefaultChannelPipeline implements ChannelPipeline { return StringUtil.simpleClassName(handlerType) + "#0"; } + private int findCtxIdx(Predicate predicate) { + for (int i = 0; i < handlers.size(); i++) { + if (predicate.test(handlers.get(i))) { + return i; + } + } + return -1; + } + @Override public final ChannelPipeline remove(ChannelHandler handler) { - remove(getContextOrDie(handler)); + final AbstractChannelHandlerContext ctx; + EventExecutor executor = executor(); + boolean inEventLoop = executor.inEventLoop(); + synchronized (handlers) { + int idx = findCtxIdx(context -> context.handler() == handler); + if (idx == -1) { + throw new NoSuchElementException(); + } + ctx = handlers.remove(idx); + assert ctx != null; + + if (!inEventLoop) { + try { + executor.execute(() -> remove0(ctx)); + return this; + } catch (Throwable cause) { + handlers.add(idx, ctx); + throw cause; + } + } + } + + remove0(ctx); return this; } @Override public final ChannelHandler remove(String name) { - return remove(getContextOrDie(name)).handler(); + final AbstractChannelHandlerContext ctx; + EventExecutor executor = executor(); + boolean inEventLoop = executor.inEventLoop(); + synchronized (handlers) { + int idx = findCtxIdx(context -> context.name().equals(name)); + if (idx == -1) { + throw new NoSuchElementException(); + } + ctx = handlers.remove(idx); + assert ctx != null; + + if (!inEventLoop) { + try { + executor.execute(() -> remove0(ctx)); + return ctx.handler(); + } catch (Throwable cause) { + handlers.add(idx, ctx); + throw cause; + } + } + } + + remove0(ctx); + return ctx.handler(); } @SuppressWarnings("unchecked") @Override public final T remove(Class handlerType) { - return (T) remove(getContextOrDie(handlerType)).handler(); + final AbstractChannelHandlerContext ctx; + EventExecutor executor = executor(); + boolean inEventLoop = executor.inEventLoop(); + synchronized (handlers) { + int idx = findCtxIdx(context -> handlerType.isAssignableFrom(context.handler().getClass())); + if (idx == -1) { + throw new NoSuchElementException(); + } + ctx = handlers.remove(idx); + assert ctx != null; + + if (!inEventLoop) { + try { + executor.execute(() -> remove0(ctx)); + return (T) ctx.handler(); + } catch (Throwable cause) { + handlers.add(idx, ctx); + throw cause; + } + } + } + + remove0(ctx); + return (T) ctx.handler(); } public final T removeIfExists(String name) { - return removeIfExists(context(name)); + return removeIfExists(() -> findCtxIdx(context -> name.equals(context.name()))); } public final T removeIfExists(Class handlerType) { - return removeIfExists(context(handlerType)); + return removeIfExists(() -> findCtxIdx( + context -> handlerType.isAssignableFrom(context.handler().getClass()))); } public final T removeIfExists(ChannelHandler handler) { - return removeIfExists(context(handler)); + return removeIfExists(() -> findCtxIdx(context -> handler == context.handler())); } @SuppressWarnings("unchecked") - private T removeIfExists(ChannelHandlerContext ctx) { - if (ctx == null) { - return null; - } - return (T) remove((AbstractChannelHandlerContext) ctx).handler(); - } + private T removeIfExists(IntSupplier idxSupplier) { + final AbstractChannelHandlerContext ctx; + EventExecutor executor = executor(); + boolean inEventLoop = executor.inEventLoop(); + synchronized (handlers) { + int idx = idxSupplier.getAsInt(); + if (idx == -1) { + return null; + } + ctx = handlers.remove(idx); + assert ctx != null; - private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) { - assert ctx != head && ctx != tail; - - synchronized (this) { - remove0(ctx); - - EventExecutor executor = ctx.executor(); - if (!executor.inEventLoop()) { - executor.execute(() -> callHandlerRemoved0(ctx)); - return ctx; + if (!inEventLoop) { + try { + executor.execute(() -> remove0(ctx)); + return (T) ctx.handler(); + } catch (Throwable cause) { + handlers.add(idx, ctx); + throw cause; + } } } - callHandlerRemoved0(ctx); - return ctx; + remove0(ctx); + return (T) ctx.handler(); } - private static void remove0(AbstractChannelHandlerContext ctx) { + private void unlink(AbstractChannelHandlerContext ctx) { + assert ctx != head && ctx != tail; AbstractChannelHandlerContext prev = ctx.prev; AbstractChannelHandlerContext next = ctx.next; prev.next = next; next.prev = prev; } - @Override - public final ChannelHandler removeFirst() { - if (head.next == tail) { - throw new NoSuchElementException(); - } - return remove(head.next).handler(); - } - - @Override - public final ChannelHandler removeLast() { - if (head.next == tail) { - throw new NoSuchElementException(); - } - return remove(tail.prev).handler(); + private void remove0(AbstractChannelHandlerContext ctx) { + unlink(ctx); + callHandlerRemoved0(ctx); } @Override public final ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) { - replace(getContextOrDie(oldHandler), newName, newHandler); + replace(ctx -> ctx.handler() == oldHandler, newName, newHandler); return this; } @Override public final ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) { - return replace(getContextOrDie(oldName), newName, newHandler); + return replace(ctx -> ctx.name().equals(oldName), newName, newHandler); } @Override @SuppressWarnings("unchecked") public final T replace( Class oldHandlerType, String newName, ChannelHandler newHandler) { - return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler); + return (T) replace(ctx -> oldHandlerType.isAssignableFrom(ctx.handler().getClass()), newName, newHandler); } private ChannelHandler replace( - final AbstractChannelHandlerContext ctx, String newName, ChannelHandler newHandler) { - assert ctx != head && ctx != tail; + Predicate predicate, String newName, ChannelHandler newHandler) { + checkMultiplicity(newHandler); - final AbstractChannelHandlerContext newCtx; - synchronized (this) { - checkMultiplicity(newHandler); - if (newName == null) { - newName = generateName(newHandler); - } else { - boolean sameName = ctx.name().equals(newName); - if (!sameName) { - checkDuplicateName(newName); + if (newName == null) { + newName = generateName(newHandler); + } + AbstractChannelHandlerContext oldCtx; + AbstractChannelHandlerContext newCtx = newContext(newName, newHandler); + EventExecutor executor = executor(); + boolean inEventLoop = executor.inEventLoop(); + synchronized (handlers) { + int idx = findCtxIdx(predicate); + if (idx == -1) { + throw new NoSuchElementException(); + } + oldCtx = handlers.get(idx); + assert oldCtx != head && oldCtx != tail && oldCtx != null; + + if (!oldCtx.name().equals(newName)) { + if (context(newName) != null) { + throw new IllegalArgumentException("Duplicate handler name: " + newName); } } + AbstractChannelHandlerContext removed = handlers.set(idx, newCtx); + assert removed != null; - newCtx = newContext(ctx.executor, newName, newHandler); - - replace0(ctx, newCtx); - - EventExecutor executor = ctx.executor(); - if (!executor.inEventLoop()) { - executor.execute(() -> { - // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked) - // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and - // those event handlers must be called after handlerAdded(). - callHandlerAdded0(newCtx); - callHandlerRemoved0(ctx); - }); - return ctx.handler(); + if (!inEventLoop) { + try { + executor.execute(() -> replace0(oldCtx, newCtx)); + return oldCtx.handler(); + } catch (Throwable cause) { + handlers.set(idx, oldCtx); + throw cause; + } } } - // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked) - // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those - // event handlers must be called after handlerAdded(). - callHandlerAdded0(newCtx); - callHandlerRemoved0(ctx); - return ctx.handler(); + + replace0(oldCtx, newCtx); + return oldCtx.handler(); } - private static void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) { + private void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = oldCtx.prev; AbstractChannelHandlerContext next = oldCtx.next; newCtx.prev = prev; @@ -492,6 +587,12 @@ public class DefaultChannelPipeline implements ChannelPipeline { // update the reference to the replacement so forward of buffered content will work correctly oldCtx.prev = newCtx; oldCtx.next = newCtx; + + // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked) + // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those + // event handlers must be called after handlerAdded(). + callHandlerAdded0(newCtx); + callHandlerRemoved0(oldCtx); } private static void checkMultiplicity(ChannelHandler handler) { @@ -512,8 +613,13 @@ public class DefaultChannelPipeline implements ChannelPipeline { } catch (Throwable t) { boolean removed = false; try { - remove0(ctx); + synchronized (handlers) { + handlers.remove(ctx); + } + + unlink(ctx); ctx.callHandlerRemoved(); + removed = true; } catch (Throwable t2) { if (logger.isWarnEnabled()) { @@ -543,61 +649,27 @@ public class DefaultChannelPipeline implements ChannelPipeline { } } - @Override - public final ChannelHandler first() { - ChannelHandlerContext first = firstContext(); - if (first == null) { - return null; - } - return first.handler(); - } - - @Override - public final ChannelHandlerContext firstContext() { - AbstractChannelHandlerContext first = head.next; - if (first == tail) { - return null; - } - return head.next; - } - - @Override - public final ChannelHandler last() { - AbstractChannelHandlerContext last = tail.prev; - if (last == head) { - return null; - } - return last.handler(); - } - - @Override - public final ChannelHandlerContext lastContext() { - AbstractChannelHandlerContext last = tail.prev; - if (last == head) { - return null; - } - return last; - } - @Override public final ChannelHandler get(String name) { ChannelHandlerContext ctx = context(name); - if (ctx == null) { - return null; - } else { - return ctx.handler(); - } + return ctx == null ? null : ctx.handler(); } @SuppressWarnings("unchecked") @Override public final T get(Class handlerType) { ChannelHandlerContext ctx = context(handlerType); - if (ctx == null) { - return null; - } else { - return (T) ctx.handler(); + return ctx == null ? null : (T) ctx.handler(); + } + + private AbstractChannelHandlerContext findCtx(Predicate predicate) { + for (int i = 0; i < handlers.size(); i++) { + AbstractChannelHandlerContext ctx = handlers.get(i); + if (predicate.test(ctx)) { + return ctx; + } } + return null; } @Override @@ -605,8 +677,9 @@ public class DefaultChannelPipeline implements ChannelPipeline { if (name == null) { throw new NullPointerException("name"); } - - return context0(name); + synchronized (handlers) { + return findCtx(ctx -> ctx.name().equals(name)); + } } @Override @@ -615,18 +688,8 @@ public class DefaultChannelPipeline implements ChannelPipeline { throw new NullPointerException("handler"); } - AbstractChannelHandlerContext ctx = head.next; - for (;;) { - - if (ctx == null) { - return null; - } - - if (ctx.handler() == handler) { - return ctx; - } - - ctx = ctx.next; + synchronized (handlers) { + return findCtx(ctx -> ctx.handler() == handler); } } @@ -636,49 +699,22 @@ public class DefaultChannelPipeline implements ChannelPipeline { throw new NullPointerException("handlerType"); } - AbstractChannelHandlerContext ctx = head.next; - for (;;) { - if (ctx == null) { - return null; - } - if (handlerType.isAssignableFrom(ctx.handler().getClass())) { - return ctx; - } - ctx = ctx.next; + synchronized (handlers) { + return findCtx(ctx -> handlerType.isAssignableFrom(ctx.handler().getClass())); } } @Override public final List names() { - List list = new ArrayList<>(); - AbstractChannelHandlerContext ctx = head.next; - for (;;) { - if (ctx == null) { - return list; + synchronized (handlers) { + List names = new ArrayList<>(handlers.size()); + for (int i = 0; i < handlers.size(); i++) { + names.add(handlers.get(i).name()); } - list.add(ctx.name()); - ctx = ctx.next; + return names; } } - @Override - public final Map toMap() { - Map map = new LinkedHashMap<>(); - AbstractChannelHandlerContext ctx = head.next; - for (;;) { - if (ctx == tail) { - return map; - } - map.put(ctx.name(), ctx.handler()); - ctx = ctx.next; - } - } - - @Override - public final Iterator> iterator() { - return toMap().entrySet().iterator(); - } - /** * Returns the {@link String} representation of this pipeline. */ @@ -687,140 +723,168 @@ public class DefaultChannelPipeline implements ChannelPipeline { StringBuilder buf = new StringBuilder() .append(StringUtil.simpleClassName(this)) .append('{'); - AbstractChannelHandlerContext ctx = head.next; - for (;;) { - if (ctx == tail) { - break; + synchronized (handlers) { + if (!handlers.isEmpty()) { + for (int i = 0; i < handlers.size(); i++) { + AbstractChannelHandlerContext ctx = handlers.get(i); + + buf.append('(') + .append(ctx.name()) + .append(" = ") + .append(ctx.handler().getClass().getName()) + .append("), "); + } + buf.setLength(buf.length() - 2); } - - buf.append('(') - .append(ctx.name()) - .append(" = ") - .append(ctx.handler().getClass().getName()) - .append(')'); - - ctx = ctx.next; - if (ctx == tail) { - break; - } - - buf.append(", "); } buf.append('}'); return buf.toString(); } + @Override + public ChannelHandler removeFirst() { + synchronized (handlers) { + if (handlers.isEmpty()) { + throw new NoSuchElementException(); + } + return handlers.remove(0).handler(); + } + } + + @Override + public ChannelHandler removeLast() { + synchronized (handlers) { + if (handlers.isEmpty()) { + throw new NoSuchElementException(); + } + return handlers.remove(handlers.size() - 1).handler(); + } + } + + @Override + public ChannelHandler first() { + ChannelHandlerContext ctx = firstContext(); + return ctx == null ? null : ctx.handler(); + } + + @Override + public ChannelHandlerContext firstContext() { + synchronized (handlers) { + return handlers.isEmpty() ? null : handlers.get(0); + } + } + + @Override + public ChannelHandler last() { + ChannelHandlerContext ctx = lastContext(); + return ctx == null ? null : ctx.handler(); + } + + @Override + public ChannelHandlerContext lastContext() { + synchronized (handlers) { + return handlers.isEmpty() ? null : handlers.get(handlers.size() - 1); + } + } + + @Override + public Map toMap() { + Map map; + synchronized (handlers) { + if (handlers.isEmpty()) { + return Collections.emptyMap(); + } + map = new LinkedHashMap<>(handlers.size()); + for (int i = 0; i < handlers.size(); i++) { + ChannelHandlerContext ctx = handlers.get(i); + map.put(ctx.name(), ctx.handler()); + } + return map; + } + } + + @Override + public Iterator> iterator() { + return toMap().entrySet().iterator(); + } + @Override public final ChannelPipeline fireChannelRegistered() { - AbstractChannelHandlerContext.invokeChannelRegistered(head); + head.invokeChannelRegistered(); return this; } @Override public final ChannelPipeline fireChannelUnregistered() { - AbstractChannelHandlerContext.invokeChannelUnregistered(head); + head.invokeChannelUnregistered(); return this; } /** * Removes all handlers from the pipeline one by one from tail (exclusive) to head (exclusive) to trigger * handlerRemoved(). - * - * Note that we traverse up the pipeline ({@link #destroyUp(AbstractChannelHandlerContext, boolean)}) - * before traversing down ({@link #destroyDown(Thread, AbstractChannelHandlerContext, boolean)}) so that - * the handlers are removed after all events are handled. - * - * See: https://github.com/netty/netty/issues/3156 */ - private synchronized void destroy() { - destroyUp(head.next, false); - } - - private void destroyUp(AbstractChannelHandlerContext ctx, boolean inEventLoop) { - final Thread currentThread = Thread.currentThread(); - final AbstractChannelHandlerContext tail = this.tail; - for (;;) { - if (ctx == tail) { - destroyDown(currentThread, tail.prev, inEventLoop); - break; - } - - final EventExecutor executor = ctx.executor(); - if (!inEventLoop && !executor.inEventLoop(currentThread)) { - final AbstractChannelHandlerContext finalCtx = ctx; - executor.execute(() -> destroyUp(finalCtx, true)); - break; - } - - ctx = ctx.next; - inEventLoop = false; + private void destroy() { + EventExecutor executor = executor(); + if (executor.inEventLoop()) { + destroy0(); + } else { + executor.execute(this::destroy0); } } - private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx, boolean inEventLoop) { - // We have reached at tail; now traverse backwards. - final AbstractChannelHandlerContext head = this.head; - for (;;) { - if (ctx == head) { - break; - } - - final EventExecutor executor = ctx.executor(); - if (inEventLoop || executor.inEventLoop(currentThread)) { - synchronized (this) { - remove0(ctx); - } - callHandlerRemoved0(ctx); - } else { - final AbstractChannelHandlerContext finalCtx = ctx; - executor.execute(() -> destroyDown(Thread.currentThread(), finalCtx, true)); - break; + private void destroy0() { + assert executor().inEventLoop(); + AbstractChannelHandlerContext ctx = this.tail.prev; + while (ctx != head) { + synchronized (handlers) { + handlers.remove(ctx); } + remove0(ctx); ctx = ctx.prev; - inEventLoop = false; } } @Override public final ChannelPipeline fireChannelActive() { - AbstractChannelHandlerContext.invokeChannelActive(head); + head.invokeChannelActive(); return this; } @Override public final ChannelPipeline fireChannelInactive() { - AbstractChannelHandlerContext.invokeChannelInactive(head); + head.invokeChannelInactive(); return this; } @Override public final ChannelPipeline fireExceptionCaught(Throwable cause) { - AbstractChannelHandlerContext.invokeExceptionCaught(head, cause); + head.invokeExceptionCaught(cause); return this; } @Override public final ChannelPipeline fireUserEventTriggered(Object event) { - AbstractChannelHandlerContext.invokeUserEventTriggered(head, event); + head.invokeUserEventTriggered(event); return this; } @Override public final ChannelPipeline fireChannelRead(Object msg) { - AbstractChannelHandlerContext.invokeChannelRead(head, msg); + head.invokeChannelRead(msg); return this; } @Override public final ChannelPipeline fireChannelReadComplete() { - AbstractChannelHandlerContext.invokeChannelReadComplete(head); + head.invokeChannelReadComplete(); return this; } @Override public final ChannelPipeline fireChannelWritabilityChanged() { - AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head); + head.invokeChannelWritabilityChanged(); return this; } @@ -929,12 +993,12 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public final ChannelPromise newPromise() { - return new DefaultChannelPromise(channel); + return new DefaultChannelPromise(channel(), executor()); } @Override public final ChannelProgressivePromise newProgressivePromise() { - return new DefaultChannelProgressivePromise(channel); + return new DefaultChannelProgressivePromise(channel(), executor()); } @Override @@ -944,7 +1008,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public final ChannelFuture newFailedFuture(Throwable cause) { - return new FailedChannelFuture(channel, null, cause); + return new FailedChannelFuture(channel(), executor(), cause); } @Override @@ -952,50 +1016,6 @@ public class DefaultChannelPipeline implements ChannelPipeline { return voidPromise; } - private void checkDuplicateName(String name) { - if (context0(name) != null) { - throw new IllegalArgumentException("Duplicate handler name: " + name); - } - } - - private AbstractChannelHandlerContext context0(String name) { - AbstractChannelHandlerContext context = head.next; - while (context != tail) { - if (context.name().equals(name)) { - return context; - } - context = context.next; - } - return null; - } - - private AbstractChannelHandlerContext getContextOrDie(String name) { - AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name); - if (ctx == null) { - throw new NoSuchElementException(name); - } else { - return ctx; - } - } - - private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) { - AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler); - if (ctx == null) { - throw new NoSuchElementException(handler.getClass().getName()); - } else { - return ctx; - } - } - - private AbstractChannelHandlerContext getContextOrDie(Class handlerType) { - AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handlerType); - if (ctx == null) { - throw new NoSuchElementException(handlerType.getName()); - } else { - return ctx; - } - } - /** * Called once a {@link Throwable} hit the end of the {@link ChannelPipeline} without been handled by the user * in {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}. @@ -1085,10 +1105,15 @@ public class DefaultChannelPipeline implements ChannelPipeline { final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { TailContext(DefaultChannelPipeline pipeline) { - super(pipeline, null, TAIL_NAME, TailContext.class); + super(pipeline, TAIL_NAME, TailContext.class); setAddComplete(); } + @Override + public EventExecutor executor() { + return pipeline().executor(); + } + @Override public ChannelHandler handler() { return this; @@ -1148,11 +1173,16 @@ public class DefaultChannelPipeline implements ChannelPipeline { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { - super(pipeline, null, HEAD_NAME, HeadContext.class); + super(pipeline, HEAD_NAME, HeadContext.class); unsafe = pipeline.channel().unsafe(); setAddComplete(); } + @Override + public EventExecutor executor() { + return pipeline().executor(); + } + @Override public ChannelHandler handler() { return this; diff --git a/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java b/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java index 69cc007bec..ef6fa7c9ce 100644 --- a/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java @@ -30,44 +30,6 @@ import static org.junit.Assert.*; public abstract class AbstractEventLoopTest { - /** - * Test for https://github.com/netty/netty/issues/803 - */ - @Test - public void testReregister() { - EventLoopGroup group = newEventLoopGroup(); - final EventExecutorGroup eventExecutorGroup = new DefaultEventExecutorGroup(2); - - try { - ServerBootstrap bootstrap = new ServerBootstrap(); - ChannelFuture future = bootstrap.channel(newChannel()).group(group) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - } - }).handler(new ChannelInitializer() { - @Override - public void initChannel(ServerSocketChannel ch) throws Exception { - ch.pipeline().addLast(new TestChannelHandler()); - ch.pipeline().addLast(eventExecutorGroup.next(), new TestChannelHandler2()); - } - }) - .bind(0).awaitUninterruptibly(); - - EventExecutor executor = future.channel().pipeline().context(TestChannelHandler2.class).executor(); - EventExecutor executor1 = future.channel().pipeline().context(TestChannelHandler.class).executor(); - - future.channel().deregister().awaitUninterruptibly(); - Channel channel = future.channel().register().awaitUninterruptibly().channel(); - EventExecutor executorNew = channel.pipeline().context(TestChannelHandler.class).executor(); - assertSame(executor1, executorNew); - assertSame(executor, future.channel().pipeline().context(TestChannelHandler2.class).executor()); - } finally { - group.shutdownGracefully(); - eventExecutorGroup.shutdownGracefully(); - } - } - @Test(timeout = 5000) public void testShutdownGracefullyNoQuietPeriod() throws Exception { EventLoopGroup loop = newEventLoopGroup(); @@ -86,13 +48,6 @@ public abstract class AbstractEventLoopTest { assertTrue(loop.isTerminated()); } - private static final class TestChannelHandler extends ChannelDuplexHandler { } - - private static final class TestChannelHandler2 extends ChannelDuplexHandler { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { } - } - protected abstract EventLoopGroup newEventLoopGroup(); protected abstract Class newChannel(); } diff --git a/transport/src/test/java/io/netty/channel/ChannelInitializerTest.java b/transport/src/test/java/io/netty/channel/ChannelInitializerTest.java index 07bf65e0e4..3360d4ed14 100644 --- a/transport/src/test/java/io/netty/channel/ChannelInitializerTest.java +++ b/transport/src/test/java/io/netty/channel/ChannelInitializerTest.java @@ -22,26 +22,14 @@ import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; import io.netty.channel.local.LocalHandler; import io.netty.channel.local.LocalServerChannel; -import io.netty.util.concurrent.AbstractEventExecutor; -import io.netty.util.concurrent.EventExecutor; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.ScheduledFuture; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.nio.channels.ClosedChannelException; -import java.util.Collection; import java.util.Iterator; -import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -49,7 +37,6 @@ import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; public class ChannelInitializerTest { @@ -258,186 +245,6 @@ public class ChannelInitializerTest { } } - @SuppressWarnings("deprecation") - @Test(timeout = 10000) - public void testChannelInitializerEventExecutor() throws Throwable { - final AtomicInteger invokeCount = new AtomicInteger(); - final AtomicInteger completeCount = new AtomicInteger(); - final AtomicReference errorRef = new AtomicReference<>(); - LocalAddress addr = new LocalAddress("test"); - - final EventExecutor executor = new AbstractEventExecutor() { - private final ScheduledExecutorService execService = Executors.newSingleThreadScheduledExecutor(); - - @Override - public boolean inEventLoop(Thread thread) { - return false; - } - - @Override - public boolean isShuttingDown() { - return execService.isShutdown(); - } - - @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { - shutdown(); - return newSucceededFuture(null); - } - - @Override - public Future terminationFuture() { - return newFailedFuture(new UnsupportedOperationException()); - } - - @Override - public void shutdown() { - execService.shutdown(); - } - - @Override - public List shutdownNow() { - return execService.shutdownNow(); - } - - @Override - public boolean isShutdown() { - return execService.isShutdown(); - } - - @Override - public boolean isTerminated() { - return execService.isTerminated(); - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - return execService.awaitTermination(timeout, unit); - } - - @Override - public List> invokeAll(Collection> tasks) - throws InterruptedException { - return execService.invokeAll(tasks); - } - - @Override - public List> invokeAll( - Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { - return execService.invokeAll(tasks, timeout, unit); - } - - @Override - public T invokeAny(Collection> tasks) - throws InterruptedException, ExecutionException { - return execService.invokeAny(tasks); - } - - @Override - public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - return execService.invokeAny(tasks, timeout, unit); - } - - @Override - public void execute(Runnable command) { - execService.execute(command); - } - - @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - throw new UnsupportedOperationException(); - } - - @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - throw new UnsupportedOperationException(); - } - - @Override - public ScheduledFuture scheduleAtFixedRate( - Runnable command, long initialDelay, long period, TimeUnit unit) { - throw new UnsupportedOperationException(); - } - - @Override - public ScheduledFuture scheduleWithFixedDelay( - Runnable command, long initialDelay, long delay, TimeUnit unit) { - throw new UnsupportedOperationException(); - } - }; - - final CountDownLatch latch = new CountDownLatch(1); - ServerBootstrap serverBootstrap = new ServerBootstrap() - .channel(LocalServerChannel.class) - .group(group) - .localAddress(addr) - .childHandler(new ChannelInitializer() { - @Override - protected void initChannel(LocalChannel ch) { - ch.pipeline().addLast(executor, new ChannelInitializer() { - @Override - protected void initChannel(Channel ch) { - invokeCount.incrementAndGet(); - ChannelHandlerContext ctx = ch.pipeline().context(this); - assertNotNull(ctx); - ch.pipeline().addAfter(ctx.executor(), - ctx.name(), null, new ChannelInboundHandlerAdapter() { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { - // just drop on the floor. - } - - @Override - public void handlerRemoved(ChannelHandlerContext ctx) { - latch.countDown(); - } - }); - completeCount.incrementAndGet(); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - if (cause instanceof AssertionError) { - errorRef.set(cause); - } - } - }); - } - }); - - Channel server = serverBootstrap.bind().sync().channel(); - - Bootstrap clientBootstrap = new Bootstrap() - .channel(LocalChannel.class) - .group(group) - .remoteAddress(addr) - .handler(new ChannelInboundHandlerAdapter()); - - Channel client = clientBootstrap.connect().sync().channel(); - client.writeAndFlush("Hello World").sync(); - - client.close().sync(); - server.close().sync(); - - client.closeFuture().sync(); - server.closeFuture().sync(); - - // Wait until the handler is removed from the pipeline and so no more events are handled by it. - latch.await(); - - assertEquals(1, invokeCount.get()); - assertEquals(invokeCount.get(), completeCount.get()); - - Throwable cause = errorRef.get(); - if (cause != null) { - throw cause; - } - - executor.shutdown(); - assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS)); - } - private static void closeChannel(Channel c) { if (c != null) { c.close().syncUninterruptibly(); diff --git a/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java b/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java index dfae3be5b2..c769cbf963 100644 --- a/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java +++ b/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java @@ -19,17 +19,11 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.util.CharsetUtil; -import io.netty.util.concurrent.DefaultThreadFactory; -import io.netty.util.concurrent.RejectedExecutionHandlers; -import io.netty.util.concurrent.SingleThreadEventExecutor; import org.junit.Test; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.util.Queue; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; import static io.netty.buffer.Unpooled.*; import static org.hamcrest.Matchers.*; @@ -388,87 +382,6 @@ public class ChannelOutboundBufferTest { safeClose(ch); } - @Test(timeout = 5000) - public void testWriteTaskRejected() throws Exception { - final SingleThreadEventExecutor executor = new SingleThreadEventExecutor( - new DefaultThreadFactory("executorPool"), - 1, RejectedExecutionHandlers.reject()) { - - @Override - protected Queue newTaskQueue(int maxPendingTasks) { - return super.newTaskQueue(1); - } - }; - final CountDownLatch handlerAddedLatch = new CountDownLatch(1); - final CountDownLatch handlerRemovedLatch = new CountDownLatch(1); - EmbeddedChannel ch = new EmbeddedChannel(); - ch.pipeline().addLast(executor, "handler", new ChannelOutboundHandlerAdapter() { - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - promise.setFailure(new AssertionError("Should not be called")); - } - - @Override - public void handlerAdded(ChannelHandlerContext ctx) { - handlerAddedLatch.countDown(); - } - - @Override - public void handlerRemoved(ChannelHandlerContext ctx) { - handlerRemovedLatch.countDown(); - } - }); - - // Lets wait until we are sure the handler was added. - handlerAddedLatch.await(); - - final CountDownLatch executeLatch = new CountDownLatch(1); - final CountDownLatch runLatch = new CountDownLatch(1); - executor.execute(() -> { - try { - runLatch.countDown(); - executeLatch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - }); - - runLatch.await(); - - executor.execute(() -> { - // Will not be executed but ensure the pending count is 1. - }); - - assertEquals(1, executor.pendingTasks()); - assertEquals(0, ch.unsafe().outboundBuffer().totalPendingWriteBytes()); - - ByteBuf buffer = buffer(128).writeZero(128); - ChannelFuture future = ch.write(buffer); - ch.runPendingTasks(); - - assertTrue(future.cause() instanceof RejectedExecutionException); - assertEquals(0, buffer.refCnt()); - - // In case of rejected task we should not have anything pending. - assertEquals(0, ch.unsafe().outboundBuffer().totalPendingWriteBytes()); - executeLatch.countDown(); - - while (executor.pendingTasks() != 0) { - // Wait until there is no more pending task left. - Thread.sleep(10); - } - - ch.pipeline().remove("handler"); - - // Ensure we do not try to shutdown the executor before we handled everything for the Channel. Otherwise - // the Executor may reject when the Channel tries to add a task to it. - handlerRemovedLatch.await(); - - safeClose(ch); - - executor.shutdownGracefully(); - } - private static void safeClose(EmbeddedChannel ch) { ch.finish(); for (;;) { diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index 301a673bfa..eb53fa7439 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -32,14 +32,11 @@ import io.netty.util.AbstractReferenceCounted; import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; import io.netty.util.concurrent.AbstractEventExecutor; -import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutor; -import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.ScheduledFuture; -import io.netty.util.concurrent.UnorderedThreadPoolEventExecutor; import org.junit.After; import org.junit.AfterClass; import org.junit.Test; @@ -51,21 +48,17 @@ import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; import java.util.Queue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.LockSupport; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; @@ -672,37 +665,6 @@ public class DefaultChannelPipelineTest { assertSame(exception, error.get()); } - @Test - public void testChannelUnregistrationWithCustomExecutor() throws Exception { - final CountDownLatch channelLatch = new CountDownLatch(1); - final CountDownLatch handlerLatch = new CountDownLatch(1); - ChannelPipeline pipeline = newLocalChannel().pipeline(); - pipeline.addLast(new ChannelInitializer() { - @Override - protected void initChannel(Channel ch) throws Exception { - ch.pipeline().addLast(new WrapperExecutor(), - new ChannelInboundHandlerAdapter() { - - @Override - public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { - channelLatch.countDown(); - } - - @Override - public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - handlerLatch.countDown(); - } - }); - } - }); - Channel channel = pipeline.channel(); - channel.register().sync(); - channel.close(); - channel.deregister(); - assertTrue(channelLatch.await(2, TimeUnit.SECONDS)); - assertTrue(handlerLatch.await(2, TimeUnit.SECONDS)); - } - @Test(timeout = 3000) public void testAddHandlerBeforeRegisteredThenRemove() { final EventLoop loop = group.next(); @@ -740,129 +702,6 @@ public class DefaultChannelPipelineTest { pipeline.channel().close().syncUninterruptibly(); } - @Test(timeout = 3000) - public void testHandlerAddedAndRemovedCalledInCorrectOrder() throws Throwable { - final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1); - final EventExecutorGroup group2 = new DefaultEventExecutorGroup(1); - - try { - BlockingQueue addedQueue = new LinkedBlockingQueue<>(); - BlockingQueue removedQueue = new LinkedBlockingQueue<>(); - - CheckOrderHandler handler1 = new CheckOrderHandler(addedQueue, removedQueue); - CheckOrderHandler handler2 = new CheckOrderHandler(addedQueue, removedQueue); - CheckOrderHandler handler3 = new CheckOrderHandler(addedQueue, removedQueue); - CheckOrderHandler handler4 = new CheckOrderHandler(addedQueue, removedQueue); - - ChannelPipeline pipeline = newLocalChannel().pipeline(); - pipeline.addLast(handler1); - pipeline.channel().register().syncUninterruptibly(); - pipeline.addLast(group1.next(), handler2); - pipeline.addLast(group2.next(), handler3); - pipeline.addLast(handler4); - - assertTrue(removedQueue.isEmpty()); - pipeline.channel().close().syncUninterruptibly(); - assertHandler(addedQueue.take(), handler1); - - // Depending on timing this can be handler2 or handler3 as these use different EventExecutorGroups. - assertHandler(addedQueue.take(), handler2, handler3, handler4); - assertHandler(addedQueue.take(), handler2, handler3, handler4); - assertHandler(addedQueue.take(), handler2, handler3, handler4); - - assertTrue(addedQueue.isEmpty()); - - assertHandler(removedQueue.take(), handler4); - assertHandler(removedQueue.take(), handler3); - assertHandler(removedQueue.take(), handler2); - assertHandler(removedQueue.take(), handler1); - assertTrue(removedQueue.isEmpty()); - } finally { - group1.shutdownGracefully(); - group2.shutdownGracefully(); - } - } - - @Test(timeout = 3000) - public void testHandlerAddedExceptionFromChildHandlerIsPropagated() { - final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1); - try { - final Promise promise = group1.next().newPromise(); - final Exception exception = new RuntimeException(); - ChannelPipeline pipeline = newLocalChannel().pipeline(); - pipeline.addLast(group1.next(), new CheckExceptionHandler(exception, promise)); - pipeline.addFirst(new ChannelHandlerAdapter() { - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - throw exception; - } - }); - pipeline.channel().register(); - promise.syncUninterruptibly(); - pipeline.channel().close().syncUninterruptibly(); - } finally { - group1.shutdownGracefully(); - } - } - - @Test(timeout = 3000) - public void testHandlerRemovedExceptionFromChildHandlerIsPropagated() { - final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1); - try { - final Promise promise = group1.next().newPromise(); - String handlerName = "foo"; - final Exception exception = new RuntimeException(); - ChannelPipeline pipeline = newLocalChannel().pipeline(); - pipeline.addLast(handlerName, new ChannelHandlerAdapter() { - @Override - public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - throw exception; - } - }); - pipeline.addLast(group1.next(), new CheckExceptionHandler(exception, promise)); - pipeline.channel().register().syncUninterruptibly(); - pipeline.remove(handlerName); - promise.syncUninterruptibly(); - pipeline.channel().close().syncUninterruptibly(); - } finally { - group1.shutdownGracefully(); - } - } - - @Test(timeout = 3000) - public void testHandlerAddedThrowsAndRemovedThrowsException() throws InterruptedException { - final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1); - try { - final CountDownLatch latch = new CountDownLatch(1); - final Promise promise = group1.next().newPromise(); - final Exception exceptionAdded = new RuntimeException(); - final Exception exceptionRemoved = new RuntimeException(); - String handlerName = "foo"; - ChannelPipeline pipeline = newLocalChannel().pipeline(); - pipeline.addLast(group1.next(), new CheckExceptionHandler(exceptionAdded, promise)); - pipeline.addFirst(handlerName, new ChannelHandlerAdapter() { - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - throw exceptionAdded; - } - - @Override - public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - // Execute this later so we are sure the exception is handled first. - ctx.executor().execute(latch::countDown); - throw exceptionRemoved; - } - }); - pipeline.register().syncUninterruptibly(); - latch.await(); - assertNull(pipeline.context(handlerName)); - promise.syncUninterruptibly(); - pipeline.channel().close().syncUninterruptibly(); - } finally { - group1.shutdownGracefully(); - } - } - @Test(timeout = 2000) public void testAddRemoveHandlerCalled() throws Throwable { ChannelPipeline pipeline = newLocalChannel().pipeline(); @@ -1006,76 +845,6 @@ public class DefaultChannelPipelineTest { pipeline.addBefore("test", null, newHandler()); } - @Test(timeout = 3000) - public void testUnorderedEventExecutor() throws Throwable { - EventExecutorGroup eventExecutors = new UnorderedThreadPoolEventExecutor(2); - EventLoopGroup defaultGroup = new MultithreadEventLoopGroup(1, LocalHandler.newFactory()); - try { - EventLoop eventLoop1 = defaultGroup.next(); - ChannelPipeline pipeline1 = new LocalChannel(eventLoop1).pipeline(); - - pipeline1.channel().register().syncUninterruptibly(); - final CountDownLatch latch = new CountDownLatch(1); - pipeline1.addLast(eventExecutors.next(), new ChannelInboundHandlerAdapter() { - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - // Just block one of the two threads. - LockSupport.park(); - } - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - latch.countDown(); - } - }); - // Trigger an event, as we use UnorderedEventExecutor userEventTriggered should be called even when - // handlerAdded(...) blocks. - pipeline1.fireUserEventTriggered(""); - latch.await(); - } finally { - defaultGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).syncUninterruptibly(); - eventExecutors.shutdownGracefully(0, 0, TimeUnit.SECONDS).syncUninterruptibly(); - } - } - - @Test - public void testPinExecutor() { - EventExecutorGroup group = new DefaultEventExecutorGroup(2); - ChannelPipeline pipeline = newLocalChannel().pipeline(); - ChannelPipeline pipeline2 = newLocalChannel().pipeline(); - - EventExecutor executor = group.next(); - pipeline.addLast(executor, "h1", new ChannelInboundHandlerAdapter()); - pipeline.addLast(executor, "h2", new ChannelInboundHandlerAdapter()); - pipeline2.addLast(group.next(), "h3", new ChannelInboundHandlerAdapter()); - - EventExecutor executor1 = pipeline.context("h1").executor(); - EventExecutor executor2 = pipeline.context("h2").executor(); - assertNotNull(executor1); - assertNotNull(executor2); - assertSame(executor1, executor2); - EventExecutor executor3 = pipeline2.context("h3").executor(); - assertNotNull(executor3); - assertNotSame(executor3, executor2); - group.shutdownGracefully(0, 0, TimeUnit.SECONDS); - } - - @Test - public void testNotPinExecutor() { - EventExecutorGroup group = new DefaultEventExecutorGroup(2); - ChannelPipeline pipeline = newLocalChannel().pipeline(); - - pipeline.addLast(group.next(), "h1", new ChannelInboundHandlerAdapter()); - pipeline.addLast(group.next(), "h2", new ChannelInboundHandlerAdapter()); - - EventExecutor executor1 = pipeline.context("h1").executor(); - EventExecutor executor2 = pipeline.context("h2").executor(); - assertNotNull(executor1); - assertNotNull(executor2); - assertNotSame(executor1, executor2); - group.shutdownGracefully(0, 0, TimeUnit.SECONDS); - } - @Test(timeout = 3000) public void testVoidPromiseNotify() throws Throwable { EventLoopGroup defaultGroup = new MultithreadEventLoopGroup(1, LocalHandler.newFactory()); @@ -1832,13 +1601,21 @@ public class DefaultChannelPipelineTest { } private static void verifyContextNumber(ChannelPipeline pipeline, int expectedNumber) { - AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) pipeline.firstContext(); - int handlerNumber = 0; - while (ctx != ((DefaultChannelPipeline) pipeline).tail) { - handlerNumber++; - ctx = ctx.next; - } - assertEquals(expectedNumber, handlerNumber); + assertEquals(expectedNumber, pipeline.names().size()); + assertEquals(expectedNumber, pipeline.toMap().size()); + + pipeline.executor().submit(new Runnable() { + @Override + public void run() { + AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) pipeline.firstContext(); + int handlerNumber = 0; + while (ctx != ((DefaultChannelPipeline) pipeline).tail) { + handlerNumber++; + ctx = ctx.next; + } + assertEquals(expectedNumber, handlerNumber); + } + }).syncUninterruptibly(); } private static ChannelHandler[] newHandlers(int num) { diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java deleted file mode 100644 index ba5f8e8549..0000000000 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java +++ /dev/null @@ -1,597 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.local; - -import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelPromise; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.MultithreadEventLoopGroup; -import io.netty.util.ReferenceCountUtil; -import io.netty.util.concurrent.DefaultEventExecutorGroup; -import io.netty.util.concurrent.DefaultThreadFactory; -import io.netty.util.concurrent.EventExecutorGroup; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; - -import java.util.HashSet; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicReference; - -public class LocalTransportThreadModelTest { - - private static EventLoopGroup group; - private static LocalAddress localAddr; - - @BeforeClass - public static void init() { - // Configure a test server - group = new MultithreadEventLoopGroup(LocalHandler.newFactory()); - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { - // Discard - ReferenceCountUtil.release(msg); - } - }); - } - }); - - localAddr = (LocalAddress) sb.bind(LocalAddress.ANY).syncUninterruptibly().channel().localAddress(); - } - - @AfterClass - public static void destroy() throws Exception { - group.shutdownGracefully().sync(); - } - - @Test(timeout = 30000) - @Ignore("regression test") - public void testStagedExecutionMultiple() throws Throwable { - for (int i = 0; i < 10; i ++) { - testStagedExecution(); - } - } - - @Test(timeout = 5000) - public void testStagedExecution() throws Throwable { - EventLoopGroup l = new MultithreadEventLoopGroup(4, new DefaultThreadFactory("l"), - LocalHandler.newFactory()); - EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1")); - EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2")); - ThreadNameAuditor h1 = new ThreadNameAuditor(); - ThreadNameAuditor h2 = new ThreadNameAuditor(); - ThreadNameAuditor h3 = new ThreadNameAuditor(true); - - Channel ch = new LocalChannel(l.next()); - // With no EventExecutor specified, h1 will be always invoked by EventLoop 'l'. - ch.pipeline().addLast(h1); - // h2 will be always invoked by EventExecutor 'e1'. - ch.pipeline().addLast(e1.next(), h2); - // h3 will be always invoked by EventExecutor 'e2'. - ch.pipeline().addLast(e2.next(), h3); - - ch.register().sync().channel().connect(localAddr).sync(); - - // Fire inbound events from all possible starting points. - ch.pipeline().fireChannelRead("1"); - ch.pipeline().context(h1).fireChannelRead("2"); - ch.pipeline().context(h2).fireChannelRead("3"); - ch.pipeline().context(h3).fireChannelRead("4"); - // Fire outbound events from all possible starting points. - ch.pipeline().write("5"); - ch.pipeline().context(h3).write("6"); - ch.pipeline().context(h2).write("7"); - ch.pipeline().context(h1).writeAndFlush("8").sync(); - - ch.close().sync(); - - // Wait until all events are handled completely. - while (h1.outboundThreadNames.size() < 3 || h3.inboundThreadNames.size() < 3 || - h1.removalThreadNames.size() < 1) { - if (h1.exception.get() != null) { - throw h1.exception.get(); - } - if (h2.exception.get() != null) { - throw h2.exception.get(); - } - if (h3.exception.get() != null) { - throw h3.exception.get(); - } - - Thread.sleep(10); - } - - String currentName = Thread.currentThread().getName(); - - try { - // Events should never be handled from the current thread. - Assert.assertFalse(h1.inboundThreadNames.contains(currentName)); - Assert.assertFalse(h2.inboundThreadNames.contains(currentName)); - Assert.assertFalse(h3.inboundThreadNames.contains(currentName)); - Assert.assertFalse(h1.outboundThreadNames.contains(currentName)); - Assert.assertFalse(h2.outboundThreadNames.contains(currentName)); - Assert.assertFalse(h3.outboundThreadNames.contains(currentName)); - Assert.assertFalse(h1.removalThreadNames.contains(currentName)); - Assert.assertFalse(h2.removalThreadNames.contains(currentName)); - Assert.assertFalse(h3.removalThreadNames.contains(currentName)); - - // Assert that events were handled by the correct executor. - for (String name: h1.inboundThreadNames) { - Assert.assertTrue(name.startsWith("l-")); - } - for (String name: h2.inboundThreadNames) { - Assert.assertTrue(name.startsWith("e1-")); - } - for (String name: h3.inboundThreadNames) { - Assert.assertTrue(name.startsWith("e2-")); - } - for (String name: h1.outboundThreadNames) { - Assert.assertTrue(name.startsWith("l-")); - } - for (String name: h2.outboundThreadNames) { - Assert.assertTrue(name.startsWith("e1-")); - } - for (String name: h3.outboundThreadNames) { - Assert.assertTrue(name.startsWith("e2-")); - } - for (String name: h1.removalThreadNames) { - Assert.assertTrue(name.startsWith("l-")); - } - for (String name: h2.removalThreadNames) { - Assert.assertTrue(name.startsWith("e1-")); - } - for (String name: h3.removalThreadNames) { - Assert.assertTrue(name.startsWith("e2-")); - } - - // Assert that the events for the same handler were handled by the same thread. - Set names = new HashSet<>(); - names.addAll(h1.inboundThreadNames); - names.addAll(h1.outboundThreadNames); - names.addAll(h1.removalThreadNames); - Assert.assertEquals(1, names.size()); - - names.clear(); - names.addAll(h2.inboundThreadNames); - names.addAll(h2.outboundThreadNames); - names.addAll(h2.removalThreadNames); - Assert.assertEquals(1, names.size()); - - names.clear(); - names.addAll(h3.inboundThreadNames); - names.addAll(h3.outboundThreadNames); - names.addAll(h3.removalThreadNames); - Assert.assertEquals(1, names.size()); - - // Count the number of events - Assert.assertEquals(1, h1.inboundThreadNames.size()); - Assert.assertEquals(2, h2.inboundThreadNames.size()); - Assert.assertEquals(3, h3.inboundThreadNames.size()); - Assert.assertEquals(3, h1.outboundThreadNames.size()); - Assert.assertEquals(2, h2.outboundThreadNames.size()); - Assert.assertEquals(1, h3.outboundThreadNames.size()); - Assert.assertEquals(1, h1.removalThreadNames.size()); - Assert.assertEquals(1, h2.removalThreadNames.size()); - Assert.assertEquals(1, h3.removalThreadNames.size()); - } catch (AssertionError e) { - System.out.println("H1I: " + h1.inboundThreadNames); - System.out.println("H2I: " + h2.inboundThreadNames); - System.out.println("H3I: " + h3.inboundThreadNames); - System.out.println("H1O: " + h1.outboundThreadNames); - System.out.println("H2O: " + h2.outboundThreadNames); - System.out.println("H3O: " + h3.outboundThreadNames); - System.out.println("H1R: " + h1.removalThreadNames); - System.out.println("H2R: " + h2.removalThreadNames); - System.out.println("H3R: " + h3.removalThreadNames); - throw e; - } finally { - l.shutdownGracefully(); - e1.shutdownGracefully(); - e2.shutdownGracefully(); - - l.terminationFuture().sync(); - e1.terminationFuture().sync(); - e2.terminationFuture().sync(); - } - } - - @Test(timeout = 30000) - @Ignore - public void testConcurrentMessageBufferAccess() throws Throwable { - EventLoopGroup l = new MultithreadEventLoopGroup(4, new DefaultThreadFactory("l"), - LocalHandler.newFactory()); - EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1")); - EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2")); - EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e3")); - EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e4")); - EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e5")); - - try { - final MessageForwarder1 h1 = new MessageForwarder1(); - final MessageForwarder2 h2 = new MessageForwarder2(); - final MessageForwarder3 h3 = new MessageForwarder3(); - final MessageForwarder1 h4 = new MessageForwarder1(); - final MessageForwarder2 h5 = new MessageForwarder2(); - final MessageDiscarder h6 = new MessageDiscarder(); - - final Channel ch = new LocalChannel(l.next()); - - // inbound: int -> byte[4] -> int -> int -> byte[4] -> int -> /dev/null - // outbound: int -> int -> byte[4] -> int -> int -> byte[4] -> /dev/null - ch.pipeline().addLast(h1) - .addLast(e1.next(), h2) - .addLast(e2.next(), h3) - .addLast(e3.next(), h4) - .addLast(e4.next(), h5) - .addLast(e5.next(), h6); - - ch.register().sync().channel().connect(localAddr).sync(); - - final int ROUNDS = 1024; - final int ELEMS_PER_ROUNDS = 8192; - final int TOTAL_CNT = ROUNDS * ELEMS_PER_ROUNDS; - for (int i = 0; i < TOTAL_CNT;) { - final int start = i; - final int end = i + ELEMS_PER_ROUNDS; - i = end; - - ch.eventLoop().execute(() -> { - for (int j = start; j < end; j ++) { - ch.pipeline().fireChannelRead(Integer.valueOf(j)); - } - }); - } - - while (h1.inCnt < TOTAL_CNT || h2.inCnt < TOTAL_CNT || h3.inCnt < TOTAL_CNT || - h4.inCnt < TOTAL_CNT || h5.inCnt < TOTAL_CNT || h6.inCnt < TOTAL_CNT) { - if (h1.exception.get() != null) { - throw h1.exception.get(); - } - if (h2.exception.get() != null) { - throw h2.exception.get(); - } - if (h3.exception.get() != null) { - throw h3.exception.get(); - } - if (h4.exception.get() != null) { - throw h4.exception.get(); - } - if (h5.exception.get() != null) { - throw h5.exception.get(); - } - if (h6.exception.get() != null) { - throw h6.exception.get(); - } - Thread.sleep(10); - } - - for (int i = 0; i < TOTAL_CNT;) { - final int start = i; - final int end = i + ELEMS_PER_ROUNDS; - i = end; - - ch.pipeline().context(h6).executor().execute(() -> { - for (int j = start; j < end; j ++) { - ch.write(Integer.valueOf(j)); - } - ch.flush(); - }); - } - - while (h1.outCnt < TOTAL_CNT || h2.outCnt < TOTAL_CNT || h3.outCnt < TOTAL_CNT || - h4.outCnt < TOTAL_CNT || h5.outCnt < TOTAL_CNT || h6.outCnt < TOTAL_CNT) { - if (h1.exception.get() != null) { - throw h1.exception.get(); - } - if (h2.exception.get() != null) { - throw h2.exception.get(); - } - if (h3.exception.get() != null) { - throw h3.exception.get(); - } - if (h4.exception.get() != null) { - throw h4.exception.get(); - } - if (h5.exception.get() != null) { - throw h5.exception.get(); - } - if (h6.exception.get() != null) { - throw h6.exception.get(); - } - Thread.sleep(10); - } - - ch.close().sync(); - } finally { - l.shutdownGracefully(); - e1.shutdownGracefully(); - e2.shutdownGracefully(); - e3.shutdownGracefully(); - e4.shutdownGracefully(); - e5.shutdownGracefully(); - - l.terminationFuture().sync(); - e1.terminationFuture().sync(); - e2.terminationFuture().sync(); - e3.terminationFuture().sync(); - e4.terminationFuture().sync(); - e5.terminationFuture().sync(); - } - } - - private static class ThreadNameAuditor extends ChannelDuplexHandler { - - private final AtomicReference exception = new AtomicReference<>(); - - private final Queue inboundThreadNames = new ConcurrentLinkedQueue<>(); - private final Queue outboundThreadNames = new ConcurrentLinkedQueue<>(); - private final Queue removalThreadNames = new ConcurrentLinkedQueue<>(); - private final boolean discard; - - ThreadNameAuditor() { - this(false); - } - - ThreadNameAuditor(boolean discard) { - this.discard = discard; - } - - @Override - public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - removalThreadNames.add(Thread.currentThread().getName()); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - inboundThreadNames.add(Thread.currentThread().getName()); - if (!discard) { - ctx.fireChannelRead(msg); - } - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - outboundThreadNames.add(Thread.currentThread().getName()); - ctx.write(msg, promise); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - exception.compareAndSet(null, cause); - System.err.print('[' + Thread.currentThread().getName() + "] "); - cause.printStackTrace(); - super.exceptionCaught(ctx, cause); - } - } - - /** - * Converts integers into a binary stream. - */ - private static class MessageForwarder1 extends ChannelDuplexHandler { - - private final AtomicReference exception = new AtomicReference<>(); - private volatile int inCnt; - private volatile int outCnt; - private volatile Thread t; - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - Thread t = this.t; - if (t == null) { - this.t = Thread.currentThread(); - } else { - Assert.assertSame(t, Thread.currentThread()); - } - - ByteBuf out = ctx.alloc().buffer(4); - int m = ((Integer) msg).intValue(); - int expected = inCnt ++; - Assert.assertEquals(expected, m); - out.writeInt(m); - - ctx.fireChannelRead(out); - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - Assert.assertSame(t, Thread.currentThread()); - - // Don't let the write request go to the server-side channel - just swallow. - boolean swallow = this == ctx.pipeline().first(); - - ByteBuf m = (ByteBuf) msg; - int count = m.readableBytes() / 4; - for (int j = 0; j < count; j ++) { - int actual = m.readInt(); - int expected = outCnt ++; - Assert.assertEquals(expected, actual); - if (!swallow) { - ctx.write(actual); - } - } - ctx.writeAndFlush(Unpooled.EMPTY_BUFFER, promise); - m.release(); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - exception.compareAndSet(null, cause); - //System.err.print("[" + Thread.currentThread().getName() + "] "); - //cause.printStackTrace(); - super.exceptionCaught(ctx, cause); - } - } - - /** - * Converts a binary stream into integers. - */ - private static class MessageForwarder2 extends ChannelDuplexHandler { - - private final AtomicReference exception = new AtomicReference<>(); - private volatile int inCnt; - private volatile int outCnt; - private volatile Thread t; - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - Thread t = this.t; - if (t == null) { - this.t = Thread.currentThread(); - } else { - Assert.assertSame(t, Thread.currentThread()); - } - - ByteBuf m = (ByteBuf) msg; - int count = m.readableBytes() / 4; - for (int j = 0; j < count; j ++) { - int actual = m.readInt(); - int expected = inCnt ++; - Assert.assertEquals(expected, actual); - ctx.fireChannelRead(actual); - } - m.release(); - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - Assert.assertSame(t, Thread.currentThread()); - - ByteBuf out = ctx.alloc().buffer(4); - int m = (Integer) msg; - int expected = outCnt ++; - Assert.assertEquals(expected, m); - out.writeInt(m); - - ctx.write(out, promise); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - exception.compareAndSet(null, cause); - //System.err.print("[" + Thread.currentThread().getName() + "] "); - //cause.printStackTrace(); - super.exceptionCaught(ctx, cause); - } - } - - /** - * Simply forwards the received object to the next handler. - */ - private static class MessageForwarder3 extends ChannelDuplexHandler { - - private final AtomicReference exception = new AtomicReference<>(); - private volatile int inCnt; - private volatile int outCnt; - private volatile Thread t; - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - Thread t = this.t; - if (t == null) { - this.t = Thread.currentThread(); - } else { - Assert.assertSame(t, Thread.currentThread()); - } - - int actual = (Integer) msg; - int expected = inCnt ++; - Assert.assertEquals(expected, actual); - - ctx.fireChannelRead(msg); - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - Assert.assertSame(t, Thread.currentThread()); - - int actual = (Integer) msg; - int expected = outCnt ++; - Assert.assertEquals(expected, actual); - - ctx.write(msg, promise); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - exception.compareAndSet(null, cause); - System.err.print('[' + Thread.currentThread().getName() + "] "); - cause.printStackTrace(); - super.exceptionCaught(ctx, cause); - } - } - - /** - * Discards all received messages. - */ - private static class MessageDiscarder extends ChannelDuplexHandler { - - private final AtomicReference exception = new AtomicReference<>(); - private volatile int inCnt; - private volatile int outCnt; - private volatile Thread t; - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - Thread t = this.t; - if (t == null) { - this.t = Thread.currentThread(); - } else { - Assert.assertSame(t, Thread.currentThread()); - } - - int actual = (Integer) msg; - int expected = inCnt ++; - Assert.assertEquals(expected, actual); - } - - @Override - public void write( - ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - Assert.assertSame(t, Thread.currentThread()); - - int actual = (Integer) msg; - int expected = outCnt ++; - Assert.assertEquals(expected, actual); - ctx.write(msg, promise); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - exception.compareAndSet(null, cause); - //System.err.print("[" + Thread.currentThread().getName() + "] "); - //cause.printStackTrace(); - super.exceptionCaught(ctx, cause); - } - } -} diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java deleted file mode 100644 index 0be7647195..0000000000 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java +++ /dev/null @@ -1,326 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.local; - -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelPromise; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.MultithreadEventLoopGroup; -import io.netty.util.ReferenceCountUtil; -import io.netty.util.concurrent.DefaultEventExecutorGroup; -import io.netty.util.concurrent.DefaultThreadFactory; -import io.netty.util.concurrent.EventExecutorGroup; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; - -import java.util.Deque; -import java.util.LinkedList; -import java.util.Queue; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedDeque; - -public class LocalTransportThreadModelTest3 { - - enum EventType { - EXCEPTION_CAUGHT, - USER_EVENT, - MESSAGE_RECEIVED_LAST, - INACTIVE, - ACTIVE, - UNREGISTERED, - REGISTERED, - MESSAGE_RECEIVED, - WRITE, - READ - } - - private static EventLoopGroup group; - private static LocalAddress localAddr; - - @BeforeClass - public static void init() { - // Configure a test server - group = new MultithreadEventLoopGroup(LocalHandler.newFactory()); - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { - // Discard - ReferenceCountUtil.release(msg); - } - }); - } - }); - - localAddr = (LocalAddress) sb.bind(LocalAddress.ANY).syncUninterruptibly().channel().localAddress(); - } - - @AfterClass - public static void destroy() throws Exception { - group.shutdownGracefully().sync(); - } - - @Test(timeout = 60000) - @Ignore("regression test") - public void testConcurrentAddRemoveInboundEventsMultiple() throws Throwable { - for (int i = 0; i < 50; i ++) { - testConcurrentAddRemoveInboundEvents(); - } - } - - @Test(timeout = 60000) - @Ignore("regression test") - public void testConcurrentAddRemoveOutboundEventsMultiple() throws Throwable { - for (int i = 0; i < 50; i ++) { - testConcurrentAddRemoveOutboundEvents(); - } - } - - @Test(timeout = 30000) - @Ignore("needs a fix") - public void testConcurrentAddRemoveInboundEvents() throws Throwable { - testConcurrentAddRemove(true); - } - - @Test(timeout = 30000) - @Ignore("needs a fix") - public void testConcurrentAddRemoveOutboundEvents() throws Throwable { - testConcurrentAddRemove(false); - } - - private static void testConcurrentAddRemove(boolean inbound) throws Exception { - EventLoopGroup l = new MultithreadEventLoopGroup(4, new DefaultThreadFactory("l"), - LocalHandler.newFactory()); - EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1")); - EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2")); - EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e3")); - EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e4")); - EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e5")); - - final EventExecutorGroup[] groups = {e1, e2, e3, e4, e5}; - try { - Deque events = new ConcurrentLinkedDeque<>(); - final EventForwarder h1 = new EventForwarder(); - final EventForwarder h2 = new EventForwarder(); - final EventForwarder h3 = new EventForwarder(); - final EventForwarder h4 = new EventForwarder(); - final EventForwarder h5 = new EventForwarder(); - final EventRecorder h6 = new EventRecorder(events, inbound); - - final Channel ch = new LocalChannel(l.next()); - if (!inbound) { - ch.config().setAutoRead(false); - } - ch.pipeline().addLast(e1.next(), h1) - .addLast(e1.next(), h2) - .addLast(e1.next(), h3) - .addLast(e1.next(), h4) - .addLast(e1.next(), h5) - .addLast(e1.next(), "recorder", h6); - - ch.register().sync().channel().connect(localAddr).sync(); - - final LinkedList expectedEvents = events(inbound, 8192); - - Throwable cause = new Throwable(); - - Thread pipelineModifier = new Thread(() -> { - Random random = new Random(); - - while (true) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - return; - } - if (!ch.isRegistered()) { - continue; - } - //EventForwardHandler forwardHandler = forwarders[random.nextInt(forwarders.length)]; - ChannelHandler handler = ch.pipeline().removeFirst(); - ch.pipeline().addBefore(groups[random.nextInt(groups.length)].next(), "recorder", - UUID.randomUUID().toString(), handler); - } - }); - pipelineModifier.setDaemon(true); - pipelineModifier.start(); - for (EventType event: expectedEvents) { - switch (event) { - case EXCEPTION_CAUGHT: - ch.pipeline().fireExceptionCaught(cause); - break; - case MESSAGE_RECEIVED: - ch.pipeline().fireChannelRead(""); - break; - case MESSAGE_RECEIVED_LAST: - ch.pipeline().fireChannelReadComplete(); - break; - case USER_EVENT: - ch.pipeline().fireUserEventTriggered(""); - break; - case WRITE: - ch.pipeline().write(""); - break; - case READ: - ch.pipeline().read(); - break; - } - } - - ch.close().sync(); - - while (events.peekLast() != EventType.UNREGISTERED) { - Thread.sleep(10); - } - - expectedEvents.addFirst(EventType.ACTIVE); - expectedEvents.addFirst(EventType.REGISTERED); - expectedEvents.addLast(EventType.INACTIVE); - expectedEvents.addLast(EventType.UNREGISTERED); - - for (;;) { - EventType event = events.poll(); - if (event == null) { - Assert.assertTrue("Missing events:" + expectedEvents, expectedEvents.isEmpty()); - break; - } - Assert.assertEquals(event, expectedEvents.poll()); - } - } finally { - l.shutdownGracefully(); - e1.shutdownGracefully(); - e2.shutdownGracefully(); - e3.shutdownGracefully(); - e4.shutdownGracefully(); - e5.shutdownGracefully(); - - l.terminationFuture().sync(); - e1.terminationFuture().sync(); - e2.terminationFuture().sync(); - e3.terminationFuture().sync(); - e4.terminationFuture().sync(); - e5.terminationFuture().sync(); - } - } - - private static LinkedList events(boolean inbound, int size) { - EventType[] events; - if (inbound) { - events = new EventType[] { - EventType.USER_EVENT, EventType.MESSAGE_RECEIVED, EventType.MESSAGE_RECEIVED_LAST, - EventType.EXCEPTION_CAUGHT}; - } else { - events = new EventType[] { - EventType.READ, EventType.WRITE, EventType.EXCEPTION_CAUGHT }; - } - - Random random = new Random(); - LinkedList expectedEvents = new LinkedList<>(); - for (int i = 0; i < size; i++) { - expectedEvents.add(events[random.nextInt(events.length)]); - } - return expectedEvents; - } - - @ChannelHandler.Sharable - private static final class EventForwarder extends ChannelDuplexHandler { } - - private static final class EventRecorder extends ChannelDuplexHandler { - private final Queue events; - private final boolean inbound; - - EventRecorder(Queue events, boolean inbound) { - this.events = events; - this.inbound = inbound; - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - events.add(EventType.EXCEPTION_CAUGHT); - } - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (inbound) { - events.add(EventType.USER_EVENT); - } - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - if (inbound) { - events.add(EventType.MESSAGE_RECEIVED_LAST); - } - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - events.add(EventType.INACTIVE); - } - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - events.add(EventType.ACTIVE); - } - - @Override - public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { - events.add(EventType.UNREGISTERED); - } - - @Override - public void channelRegistered(ChannelHandlerContext ctx) throws Exception { - events.add(EventType.REGISTERED); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - if (inbound) { - events.add(EventType.MESSAGE_RECEIVED); - } - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - if (!inbound) { - events.add(EventType.WRITE); - } - promise.setSuccess(); - } - - @Override - public void read(ChannelHandlerContext ctx) { - if (!inbound) { - events.add(EventType.READ); - } - } - } -}