diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketEchoTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketEchoTest.java index 4c4ee73faf..1781723d86 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketEchoTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketEchoTest.java @@ -22,13 +22,8 @@ import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.util.concurrent.DefaultEventExecutorGroup; -import io.netty.util.concurrent.EventExecutorGroup; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; @@ -42,29 +37,17 @@ public class SocketEchoTest extends AbstractSocketTest { private static final Random random = new Random(); static final byte[] data = new byte[1048576]; - private static EventExecutorGroup group; - static { random.nextBytes(data); } - @BeforeClass - public static void createGroup() { - group = new DefaultEventExecutorGroup(2); - } - - @AfterClass - public static void destroyGroup() throws Exception { - group.shutdownGracefully().sync(); - } - @Test(timeout = 30000) public void testSimpleEcho() throws Throwable { run(); } public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable { - testSimpleEcho0(sb, cb, false, false, true); + testSimpleEcho0(sb, cb, false, true); } @Test(timeout = 30000) @@ -73,25 +56,7 @@ public class SocketEchoTest extends AbstractSocketTest { } public void testSimpleEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable { - testSimpleEcho0(sb, cb, false, false, false); - } - - @Test//(timeout = 30000) - public void testSimpleEchoWithAdditionalExecutor() throws Throwable { - run(); - } - - public void testSimpleEchoWithAdditionalExecutor(ServerBootstrap sb, Bootstrap cb) throws Throwable { - testSimpleEcho0(sb, cb, true, false, true); - } - - @Test//(timeout = 30000) - public void testSimpleEchoWithAdditionalExecutorNotAutoRead() throws Throwable { - run(); - } - - public void testSimpleEchoWithAdditionalExecutorNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable { - testSimpleEcho0(sb, cb, true, false, false); + testSimpleEcho0(sb, cb, false, false); } @Test//(timeout = 30000) @@ -100,7 +65,7 @@ public class SocketEchoTest extends AbstractSocketTest { } public void testSimpleEchoWithVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable { - testSimpleEcho0(sb, cb, false, true, true); + testSimpleEcho0(sb, cb, true, true); } @Test//(timeout = 30000) @@ -109,48 +74,24 @@ public class SocketEchoTest extends AbstractSocketTest { } public void testSimpleEchoWithVoidPromiseNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable { - testSimpleEcho0(sb, cb, false, true, false); - } - - @Test(timeout = 30000) - public void testSimpleEchoWithAdditionalExecutorAndVoidPromise() throws Throwable { - run(); - } - - public void testSimpleEchoWithAdditionalExecutorAndVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable { - testSimpleEcho0(sb, cb, true, true, true); + testSimpleEcho0(sb, cb, true, false); } private static void testSimpleEcho0( - ServerBootstrap sb, Bootstrap cb, boolean additionalExecutor, boolean voidPromise, boolean autoRead) + ServerBootstrap sb, Bootstrap cb, boolean voidPromise, boolean autoRead) throws Throwable { final EchoHandler sh = new EchoHandler(autoRead); final EchoHandler ch = new EchoHandler(autoRead); - if (additionalExecutor) { - sb.childHandler(new ChannelInitializer() { - @Override - protected void initChannel(Channel c) throws Exception { - c.pipeline().addLast(group.next(), sh); - } - }); - cb.handler(new ChannelInitializer() { - @Override - protected void initChannel(Channel c) throws Exception { - c.pipeline().addLast(group.next(), ch); - } - }); - } else { - sb.childHandler(sh); - sb.handler(new ChannelInboundHandlerAdapter() { - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - cause.printStackTrace(); - } - }); - cb.handler(ch); - } + sb.childHandler(sh); + sb.handler(new ChannelInboundHandlerAdapter() { + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + } + }); + cb.handler(ch); sb.childOption(ChannelOption.AUTO_READ, autoRead); cb.option(ChannelOption.AUTO_READ, autoRead); diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java index 973e92be1d..73e1e6f7c6 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java @@ -35,13 +35,9 @@ import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslProvider; import io.netty.handler.ssl.util.SelfSignedCertificate; -import io.netty.util.concurrent.DefaultEventExecutorGroup; -import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Future; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -66,7 +62,6 @@ public class SocketStartTlsTest extends AbstractSocketTest { private static final LogLevel LOG_LEVEL = LogLevel.TRACE; private static final File CERT_FILE; private static final File KEY_FILE; - private static EventExecutorGroup executor; static { SelfSignedCertificate ssc; @@ -106,16 +101,6 @@ public class SocketStartTlsTest extends AbstractSocketTest { return params; } - @BeforeClass - public static void createExecutor() { - executor = new DefaultEventExecutorGroup(2); - } - - @AfterClass - public static void shutdownExecutor() throws Exception { - executor.shutdownGracefully().sync(); - } - private final SslContext serverCtx; private final SslContext clientCtx; @@ -146,7 +131,6 @@ public class SocketStartTlsTest extends AbstractSocketTest { sb.childOption(ChannelOption.AUTO_READ, autoRead); cb.option(ChannelOption.AUTO_READ, autoRead); - final EventExecutorGroup executor = SocketStartTlsTest.executor; SSLEngine sse = serverCtx.newEngine(PooledByteBufAllocator.DEFAULT); SSLEngine cse = clientCtx.newEngine(PooledByteBufAllocator.DEFAULT); @@ -159,7 +143,7 @@ public class SocketStartTlsTest extends AbstractSocketTest { ChannelPipeline p = sch.pipeline(); p.addLast("logger", new LoggingHandler(LOG_LEVEL)); p.addLast(new LineBasedFrameDecoder(64), new StringDecoder(), new StringEncoder()); - p.addLast(executor.next(), sh); + p.addLast(sh); } }); @@ -169,7 +153,7 @@ public class SocketStartTlsTest extends AbstractSocketTest { ChannelPipeline p = sch.pipeline(); p.addLast("logger", new LoggingHandler(LOG_LEVEL)); p.addLast(new LineBasedFrameDecoder(64), new StringDecoder(), new StringEncoder()); - p.addLast(executor.next(), ch); + p.addLast(ch); } }); diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index cd339533c0..f0facca702 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -60,6 +60,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private final ChannelId id; private final Unsafe unsafe; private final ChannelPipeline pipeline; + private final ChannelFuture succeedFuture; private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false); private final CloseFuture closeFuture = new CloseFuture(this); @@ -83,6 +84,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha this.parent = parent; this.eventLoop = validateEventLoop(eventLoop); id = newId(); + succeedFuture = new SucceededChannelFuture(this, eventLoop); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } @@ -97,6 +99,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha this.parent = parent; this.eventLoop = validateEventLoop(eventLoop); this.id = id; + succeedFuture = new SucceededChannelFuture(this, eventLoop); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } @@ -325,22 +328,22 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha @Override public ChannelPromise newPromise() { - return pipeline.newPromise(); + return new DefaultChannelPromise(this, eventLoop); } @Override public ChannelProgressivePromise newProgressivePromise() { - return pipeline.newProgressivePromise(); + return new DefaultChannelProgressivePromise(this, eventLoop); } @Override public ChannelFuture newSucceededFuture() { - return pipeline.newSucceededFuture(); + return succeedFuture; } @Override public ChannelFuture newFailedFuture(Throwable cause) { - return pipeline.newFailedFuture(cause); + return new FailedChannelFuture(this, eventLoop, cause); } @Override diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java index e557f2bdca..fad6f9205e 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java @@ -24,7 +24,6 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.ResourceLeakHint; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.FastThreadLocal; -import io.netty.util.concurrent.OrderedEventExecutor; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PromiseNotificationUtil; import io.netty.util.internal.ThrowableUtil; @@ -45,24 +44,20 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class); - volatile AbstractChannelHandlerContext next; - volatile AbstractChannelHandlerContext prev; + AbstractChannelHandlerContext next; + AbstractChannelHandlerContext prev; private static final AtomicIntegerFieldUpdater HANDLER_STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState"); - - /** - * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called. - */ - private static final int ADD_PENDING = 1; /** * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called. */ - private static final int ADD_COMPLETE = 2; + private static final int ADD_COMPLETE = 1; + /** * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called. */ - private static final int REMOVE_COMPLETE = 3; + private static final int REMOVE_COMPLETE = 2; /** * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called. @@ -107,11 +102,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap private final DefaultChannelPipeline pipeline; private final String name; - private final boolean ordered; - // Will be set to null if no child executor should be used, otherwise it will be set to the - // child executor. - final EventExecutor executor; private ChannelFuture succeededFuture; // Lazily instantiated tasks used to trigger events to a handler with different executor. @@ -120,14 +111,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap private volatile int handlerState = INIT; - AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, + AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, String name, Class handlerClass) { this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; - this.executor = executor; this.executionMask = mask(handlerClass); - // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor. - ordered = executor == null || executor instanceof OrderedEventExecutor; } /** @@ -248,15 +236,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return channel().config().getAllocator(); } - @Override - public EventExecutor executor() { - if (executor == null) { - return channel().eventLoop(); - } else { - return executor; - } - } - @Override public String name() { return name; @@ -264,126 +243,105 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap @Override public ChannelHandlerContext fireChannelRegistered() { - invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED)); + EventExecutor executor = executor(); + if (executor.inEventLoop()) { + findAndInvokeChannelRegistered(); + } else { + executor.execute(this::findAndInvokeChannelRegistered); + } return this; } - static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeChannelRegistered(); - } else { - executor.execute(next::invokeChannelRegistered); - } + private void findAndInvokeChannelRegistered() { + findContextInbound(MASK_CHANNEL_REGISTERED).invokeChannelRegistered(); } - private void invokeChannelRegistered() { - if (invokeHandler()) { - try { - ((ChannelInboundHandler) handler()).channelRegistered(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } else { - fireChannelRegistered(); + final void invokeChannelRegistered() { + try { + ((ChannelInboundHandler) handler()).channelRegistered(this); + } catch (Throwable t) { + notifyHandlerException(t); } } @Override public ChannelHandlerContext fireChannelUnregistered() { - invokeChannelUnregistered(findContextInbound(MASK_CHANNEL_UNREGISTERED)); + EventExecutor executor = executor(); + if (executor.inEventLoop()) { + findAndInvokeChannelUnregistered(); + } else { + executor.execute(this::findAndInvokeChannelUnregistered); + } return this; } - static void invokeChannelUnregistered(final AbstractChannelHandlerContext next) { - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeChannelUnregistered(); - } else { - executor.execute(next::invokeChannelUnregistered); - } + private void findAndInvokeChannelUnregistered() { + findContextInbound(MASK_CHANNEL_UNREGISTERED).invokeChannelUnregistered(); } - private void invokeChannelUnregistered() { - if (invokeHandler()) { - try { - ((ChannelInboundHandler) handler()).channelUnregistered(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } else { - fireChannelUnregistered(); + final void invokeChannelUnregistered() { + try { + ((ChannelInboundHandler) handler()).channelUnregistered(this); + } catch (Throwable t) { + notifyHandlerException(t); } } @Override public ChannelHandlerContext fireChannelActive() { - invokeChannelActive(findContextInbound(MASK_CHANNEL_ACTIVE)); + EventExecutor executor = executor(); + if (executor.inEventLoop()) { + findAndInvokeChannelActive(); + } else { + executor.execute(this::findAndInvokeChannelActive); + } return this; } - static void invokeChannelActive(final AbstractChannelHandlerContext next) { - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeChannelActive(); - } else { - executor.execute(next::invokeChannelActive); - } + private void findAndInvokeChannelActive() { + findContextInbound(MASK_CHANNEL_ACTIVE).invokeChannelActive(); } - private void invokeChannelActive() { - if (invokeHandler()) { - try { - ((ChannelInboundHandler) handler()).channelActive(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } else { - fireChannelActive(); + final void invokeChannelActive() { + try { + ((ChannelInboundHandler) handler()).channelActive(this); + } catch (Throwable t) { + notifyHandlerException(t); } } @Override public ChannelHandlerContext fireChannelInactive() { - invokeChannelInactive(findContextInbound(MASK_CHANNEL_INACTIVE)); + EventExecutor executor = executor(); + if (executor.inEventLoop()) { + findAndInvokeChannelInactive(); + } else { + executor.execute(this::findAndInvokeChannelInactive); + } return this; } - static void invokeChannelInactive(final AbstractChannelHandlerContext next) { - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeChannelInactive(); - } else { - executor.execute(next::invokeChannelInactive); - } + private void findAndInvokeChannelInactive() { + findContextInbound(MASK_CHANNEL_INACTIVE).invokeChannelInactive(); } - private void invokeChannelInactive() { - if (invokeHandler()) { - try { - ((ChannelInboundHandler) handler()).channelInactive(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } else { - fireChannelInactive(); + final void invokeChannelInactive() { + try { + ((ChannelInboundHandler) handler()).channelInactive(this); + } catch (Throwable t) { + notifyHandlerException(t); } } @Override - public ChannelHandlerContext fireExceptionCaught(final Throwable cause) { - invokeExceptionCaught(findContextInbound(MASK_EXCEPTION_CAUGHT), cause); - return this; - } - - static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) { + public ChannelHandlerContext fireExceptionCaught(Throwable cause) { ObjectUtil.checkNotNull(cause, "cause"); - EventExecutor executor = next.executor(); + EventExecutor executor = executor(); if (executor.inEventLoop()) { - next.invokeExceptionCaught(cause); + findAndInvokeExceptionCaught(cause); } else { try { - executor.execute(() -> next.invokeExceptionCaught(cause)); + executor.execute(() -> findAndInvokeExceptionCaught(cause)); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to submit an exceptionCaught() event.", t); @@ -391,146 +349,137 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } } } + return this; } - private void invokeExceptionCaught(final Throwable cause) { - if (invokeHandler()) { - try { - handler().exceptionCaught(this, cause); - } catch (Throwable error) { - if (logger.isDebugEnabled()) { - logger.debug( + private void findAndInvokeExceptionCaught(Throwable cause) { + findContextInbound(MASK_EXCEPTION_CAUGHT).invokeExceptionCaught(cause); + } + + final void invokeExceptionCaught(final Throwable cause) { + try { + handler().exceptionCaught(this, cause); + } catch (Throwable error) { + if (logger.isDebugEnabled()) { + logger.debug( "An exception {}" + - "was thrown by a user handler's exceptionCaught() " + - "method while handling the following exception:", + "was thrown by a user handler's exceptionCaught() " + + "method while handling the following exception:", ThrowableUtil.stackTraceToString(error), cause); - } else if (logger.isWarnEnabled()) { - logger.warn( + } else if (logger.isWarnEnabled()) { + logger.warn( "An exception '{}' [enable DEBUG level for full stacktrace] " + - "was thrown by a user handler's exceptionCaught() " + - "method while handling the following exception:", error, cause); - } + "was thrown by a user handler's exceptionCaught() " + + "method while handling the following exception:", error, cause); } - } else { - fireExceptionCaught(cause); } } @Override - public ChannelHandlerContext fireUserEventTriggered(final Object event) { - invokeUserEventTriggered(findContextInbound(MASK_USER_EVENT_TRIGGERED), event); + public ChannelHandlerContext fireUserEventTriggered(Object event) { + ObjectUtil.checkNotNull(event, "event"); + EventExecutor executor = executor(); + if (executor.inEventLoop()) { + findAndInvokeUserEventTriggered(event); + } else { + executor.execute(() -> findAndInvokeUserEventTriggered(event)); + } return this; } - static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) { - ObjectUtil.checkNotNull(event, "event"); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeUserEventTriggered(event); - } else { - executor.execute(() -> next.invokeUserEventTriggered(event)); - } + private void findAndInvokeUserEventTriggered(Object event) { + findContextInbound(MASK_USER_EVENT_TRIGGERED).invokeUserEventTriggered(event); } - private void invokeUserEventTriggered(Object event) { - if (invokeHandler()) { - try { - ((ChannelInboundHandler) handler()).userEventTriggered(this, event); - } catch (Throwable t) { - notifyHandlerException(t); - } - } else { - fireUserEventTriggered(event); + final void invokeUserEventTriggered(Object event) { + try { + ((ChannelInboundHandler) handler()).userEventTriggered(this, event); + } catch (Throwable t) { + notifyHandlerException(t); } } @Override public ChannelHandlerContext fireChannelRead(final Object msg) { - invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg); + ObjectUtil.checkNotNull(msg, "msg"); + EventExecutor executor = executor(); + if (executor.inEventLoop()) { + findAndInvokeChannelRead(msg); + } else { + try { + executor.execute(() -> findAndInvokeChannelRead(msg)); + } catch (Throwable cause) { + ReferenceCountUtil.release(msg); + throw cause; + } + } return this; } - static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { - final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); - EventExecutor executor = next.executor(); - if (executor.inEventLoop()) { - next.invokeChannelRead(m); - } else { - executor.execute(() -> next.invokeChannelRead(m)); - } + private void findAndInvokeChannelRead(Object msg) { + findContextInbound(MASK_CHANNEL_READ).invokeChannelRead(msg); } - private void invokeChannelRead(Object msg) { - if (invokeHandler()) { - try { - ((ChannelInboundHandler) handler()).channelRead(this, msg); - } catch (Throwable t) { - notifyHandlerException(t); - } - } else { - fireChannelRead(msg); + final void invokeChannelRead(Object msg) { + final Object m = pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), this); + try { + ((ChannelInboundHandler) handler()).channelRead(this, m); + } catch (Throwable t) { + notifyHandlerException(t); } } @Override public ChannelHandlerContext fireChannelReadComplete() { - invokeChannelReadComplete(findContextInbound(MASK_CHANNEL_READ_COMPLETE)); - return this; - } - - static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) { - EventExecutor executor = next.executor(); + EventExecutor executor = executor(); if (executor.inEventLoop()) { - next.invokeChannelReadComplete(); + findAndInvokeChannelReadComplete(); } else { - Tasks tasks = next.invokeTasks; + Tasks tasks = invokeTasks; if (tasks == null) { - next.invokeTasks = tasks = new Tasks(next); + invokeTasks = tasks = new Tasks(this); } executor.execute(tasks.invokeChannelReadCompleteTask); } + return this; } - private void invokeChannelReadComplete() { - if (invokeHandler()) { - try { - ((ChannelInboundHandler) handler()).channelReadComplete(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } else { - fireChannelReadComplete(); + private void findAndInvokeChannelReadComplete() { + findContextInbound(MASK_CHANNEL_READ_COMPLETE).invokeChannelReadComplete(); + } + + final void invokeChannelReadComplete() { + try { + ((ChannelInboundHandler) handler()).channelReadComplete(this); + } catch (Throwable t) { + notifyHandlerException(t); } } @Override public ChannelHandlerContext fireChannelWritabilityChanged() { - invokeChannelWritabilityChanged(findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED)); - return this; - } - - static void invokeChannelWritabilityChanged(final AbstractChannelHandlerContext next) { - EventExecutor executor = next.executor(); + EventExecutor executor = executor(); if (executor.inEventLoop()) { - next.invokeChannelWritabilityChanged(); + findAndInvokeChannelWritabilityChanged(); } else { - Tasks tasks = next.invokeTasks; + Tasks tasks = invokeTasks; if (tasks == null) { - next.invokeTasks = tasks = new Tasks(next); + invokeTasks = tasks = new Tasks(this); } executor.execute(tasks.invokeChannelWritableStateChangedTask); } + return this; } - private void invokeChannelWritabilityChanged() { - if (invokeHandler()) { - try { - ((ChannelInboundHandler) handler()).channelWritabilityChanged(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } else { - fireChannelWritabilityChanged(); + private void findAndInvokeChannelWritabilityChanged() { + findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED).invokeChannelWritabilityChanged(); + } + + final void invokeChannelWritabilityChanged() { + try { + ((ChannelInboundHandler) handler()).channelWritabilityChanged(this); + } catch (Throwable t) { + notifyHandlerException(t); } } @@ -579,25 +528,24 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return promise; } - final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND); - EventExecutor executor = next.executor(); + EventExecutor executor = executor(); if (executor.inEventLoop()) { - next.invokeBind(localAddress, promise); + findAndInvokeBind(localAddress, promise); } else { - safeExecute(executor, () -> next.invokeBind(localAddress, promise), promise, null); + safeExecute(executor, () -> findAndInvokeBind(localAddress, promise), promise, null); } return promise; } + private void findAndInvokeBind(SocketAddress localAddress, ChannelPromise promise) { + findContextOutbound(MASK_BIND).invokeBind(localAddress, promise); + } + private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { - if (invokeHandler()) { - try { - ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } else { - bind(localAddress, promise); + try { + ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); } } @@ -618,25 +566,24 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return promise; } - final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT); - EventExecutor executor = next.executor(); + EventExecutor executor = executor(); if (executor.inEventLoop()) { - next.invokeConnect(remoteAddress, localAddress, promise); + findAndInvokeConnect(remoteAddress, localAddress, promise); } else { - safeExecute(executor, () -> next.invokeConnect(remoteAddress, localAddress, promise), promise, null); + safeExecute(executor, () -> findAndInvokeConnect(remoteAddress, localAddress, promise), promise, null); } return promise; } + private void findAndInvokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { + findContextOutbound(MASK_CONNECT).invokeConnect(remoteAddress, localAddress, promise); + } + private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { - if (invokeHandler()) { - try { - ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } else { - connect(remoteAddress, localAddress, promise); + try { + ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); } } @@ -647,37 +594,38 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return promise; } - final AbstractChannelHandlerContext next = findContextOutbound(MASK_DISCONNECT); - EventExecutor executor = next.executor(); + EventExecutor executor = executor(); if (executor.inEventLoop()) { // Translate disconnect to close if the channel has no notion of disconnect-reconnect. // So far, UDP/IP is the only transport that has such behavior. if (!channel().metadata().hasDisconnect()) { - next.invokeClose(promise); + findAndInvokeClose(promise); } else { - next.invokeDisconnect(promise); + findAndInvokeDisconnect(promise); } } else { safeExecute(executor, () -> { + // Translate disconnect to close if the channel has no notion of disconnect-reconnect. + // So far, UDP/IP is the only transport that has such behavior. if (!channel().metadata().hasDisconnect()) { - next.invokeClose(promise); + findAndInvokeClose(promise); } else { - next.invokeDisconnect(promise); + findAndInvokeDisconnect(promise); } }, promise, null); } return promise; } + private void findAndInvokeDisconnect(ChannelPromise promise) { + findContextOutbound(MASK_DISCONNECT).invokeDisconnect(promise); + } + private void invokeDisconnect(ChannelPromise promise) { - if (invokeHandler()) { - try { - ((ChannelOutboundHandler) handler()).disconnect(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } else { - disconnect(promise); + try { + ((ChannelOutboundHandler) handler()).disconnect(this, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); } } @@ -688,26 +636,24 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return promise; } - final AbstractChannelHandlerContext next = findContextOutbound(MASK_CLOSE); - EventExecutor executor = next.executor(); + EventExecutor executor = executor(); if (executor.inEventLoop()) { - next.invokeClose(promise); + findAndInvokeClose(promise); } else { - safeExecute(executor, () -> next.invokeClose(promise), promise, null); + safeExecute(executor, () -> findAndInvokeClose(promise), promise, null); } - return promise; } + private void findAndInvokeClose(ChannelPromise promise) { + findContextOutbound(MASK_CLOSE).invokeClose(promise); + } + private void invokeClose(ChannelPromise promise) { - if (invokeHandler()) { - try { - ((ChannelOutboundHandler) handler()).close(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } else { - close(promise); + try { + ((ChannelOutboundHandler) handler()).close(this, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); } } @@ -718,26 +664,24 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return promise; } - final AbstractChannelHandlerContext next = findContextOutbound(MASK_REGISTER); - EventExecutor executor = next.executor(); + EventExecutor executor = executor(); if (executor.inEventLoop()) { - next.invokeRegister(promise); + findAndInvokeRegister(promise); } else { - safeExecute(executor, () -> next.invokeRegister(promise), promise, null); + safeExecute(executor, () -> findAndInvokeRegister(promise), promise, null); } - return promise; } + private void findAndInvokeRegister(ChannelPromise promise) { + findContextOutbound(MASK_REGISTER).invokeRegister(promise); + } + private void invokeRegister(ChannelPromise promise) { - if (invokeHandler()) { - try { - ((ChannelOutboundHandler) handler()).register(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } else { - register(promise); + try { + ((ChannelOutboundHandler) handler()).register(this, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); } } @@ -748,55 +692,51 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return promise; } - final AbstractChannelHandlerContext next = findContextOutbound(MASK_DEREGISTER); - EventExecutor executor = next.executor(); + EventExecutor executor = executor(); if (executor.inEventLoop()) { - next.invokeDeregister(promise); + findAndInvokeDeregister(promise); } else { - safeExecute(executor, () -> next.invokeDeregister(promise), promise, null); + safeExecute(executor, () -> findAndInvokeDeregister(promise), promise, null); } - return promise; } + private void findAndInvokeDeregister(ChannelPromise promise) { + findContextOutbound(MASK_DEREGISTER).invokeDeregister(promise); + } + private void invokeDeregister(ChannelPromise promise) { - if (invokeHandler()) { - try { - ((ChannelOutboundHandler) handler()).deregister(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } else { - deregister(promise); + try { + ((ChannelOutboundHandler) handler()).deregister(this, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); } } @Override public ChannelHandlerContext read() { - final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ); - EventExecutor executor = next.executor(); + EventExecutor executor = executor(); if (executor.inEventLoop()) { - next.invokeRead(); + findAndInvokeRead(); } else { - Tasks tasks = next.invokeTasks; + Tasks tasks = invokeTasks; if (tasks == null) { - next.invokeTasks = tasks = new Tasks(next); + invokeTasks = tasks = new Tasks(this); } executor.execute(tasks.invokeReadTask); } - return this; } + private void findAndInvokeRead() { + findContextOutbound(MASK_READ).invokeRead(); + } + private void invokeRead() { - if (invokeHandler()) { - try { - ((ChannelOutboundHandler) handler()).read(this); - } catch (Throwable t) { - notifyHandlerException(t); - } - } else { - read(); + try { + ((ChannelOutboundHandler) handler()).read(this); + } catch (Throwable t) { + notifyHandlerException(t); } } @@ -813,16 +753,9 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } private void invokeWrite(Object msg, ChannelPromise promise) { - if (invokeHandler()) { - invokeWrite0(msg, promise); - } else { - write(msg, promise); - } - } - - private void invokeWrite0(Object msg, ChannelPromise promise) { + final Object m = pipeline.touch(msg, this); try { - ((ChannelOutboundHandler) handler()).write(this, msg, promise); + ((ChannelOutboundHandler) handler()).write(this, m, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } @@ -830,14 +763,13 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap @Override public ChannelHandlerContext flush() { - final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH); - EventExecutor executor = next.executor(); + EventExecutor executor = executor(); if (executor.inEventLoop()) { - next.invokeFlush(); + findAndInvokeFlush(); } else { - Tasks tasks = next.invokeTasks; + Tasks tasks = invokeTasks; if (tasks == null) { - next.invokeTasks = tasks = new Tasks(next); + invokeTasks = tasks = new Tasks(this); } safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null); } @@ -845,15 +777,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return this; } - private void invokeFlush() { - if (invokeHandler()) { - invokeFlush0(); - } else { - flush(); - } + private void findAndInvokeFlush() { + findContextOutbound(MASK_FLUSH).invokeFlush(); } - private void invokeFlush0() { + private void invokeFlush() { try { ((ChannelOutboundHandler) handler()).flush(this); } catch (Throwable t) { @@ -868,12 +796,8 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { - if (invokeHandler()) { - invokeWrite0(msg, promise); - invokeFlush0(); - } else { - writeAndFlush(msg, promise); - } + invokeWrite(msg, promise); + invokeFlush(); } private void write(Object msg, boolean flush, ChannelPromise promise) { @@ -889,23 +813,23 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap throw e; } - final AbstractChannelHandlerContext next = findContextOutbound(flush ? (MASK_WRITE | MASK_FLUSH) : MASK_WRITE); - final Object m = pipeline.touch(msg, next); - EventExecutor executor = next.executor(); + EventExecutor executor = executor(); if (executor.inEventLoop()) { + final AbstractChannelHandlerContext next = findContextOutbound(flush ? + (MASK_WRITE | MASK_FLUSH) : MASK_WRITE); if (flush) { - next.invokeWriteAndFlush(m, promise); + next.invokeWriteAndFlush(msg, promise); } else { - next.invokeWrite(m, promise); + next.invokeWrite(msg, promise); } } else { final AbstractWriteTask task; if (flush) { - task = WriteAndFlushTask.newInstance(next, m, promise); + task = WriteAndFlushTask.newInstance(this, msg, promise); } else { - task = WriteTask.newInstance(next, m, promise); + task = WriteTask.newInstance(this, msg, promise); } - if (!safeExecute(executor, task, promise, m)) { + if (!safeExecute(executor, task, promise, msg)) { // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes // and put it back in the Recycler for re-use later. // @@ -1041,28 +965,15 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return channel().voidPromise(); } - final void setRemoved() { + private void setRemoved() { handlerState = REMOVE_COMPLETE; } final boolean setAddComplete() { - for (;;) { - int oldState = handlerState; - if (oldState == REMOVE_COMPLETE) { - return false; - } - // Ensure we never update when the handlerState is REMOVE_COMPLETE already. - // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not - // exposing ordering guarantees. - if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) { - return true; - } - } - } - - final void setAddPending() { - boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING); - assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved(). + // Ensure we never update when the handlerState is REMOVE_COMPLETE already. + // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not + // exposing ordering guarantees. + return HANDLER_STATE_UPDATER.getAndSet(this, ADD_COMPLETE) != REMOVE_COMPLETE; } final void callHandlerAdded() throws Exception { @@ -1085,20 +996,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } } - /** - * Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called - * yet. If not return {@code false} and if called or could not detect return {@code true}. - * - * If this method returns {@code false} we will not invoke the {@link ChannelHandler} but just forward the event. - * This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list - * but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}. - */ - private boolean invokeHandler() { - // Store in local variable to reduce volatile reads. - int handlerState = this.handlerState; - return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING); - } - @Override public boolean isRemoved() { return handlerState == REMOVE_COMPLETE; @@ -1140,15 +1037,15 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')'; } + private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT = + SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true); + + // Assuming a 64-bit JVM, 16 bytes object header, 3 reference fields and one int field, plus alignment + private static final int WRITE_TASK_OVERHEAD = + SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 48); + abstract static class AbstractWriteTask implements Runnable { - private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT = - SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true); - - // Assuming a 64-bit JVM, 16 bytes object header, 3 reference fields and one int field, plus alignment - private static final int WRITE_TASK_OVERHEAD = - SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 48); - private final Recycler.Handle handle; private AbstractChannelHandlerContext ctx; private Object msg; @@ -1174,11 +1071,13 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } } + protected abstract AbstractChannelHandlerContext findContext(AbstractChannelHandlerContext ctx); @Override public final void run() { try { decrementPendingOutboundBytes(); - write(ctx, msg, promise); + AbstractChannelHandlerContext next = findContext(ctx); + write(next, msg, promise); } finally { recycle(); } @@ -1227,6 +1126,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap return task; } + @Override + protected AbstractChannelHandlerContext findContext(AbstractChannelHandlerContext ctx) { + return ctx.findContextOutbound(MASK_WRITE); + } + private WriteTask(Recycler.Handle handle) { super(handle); } @@ -1252,6 +1156,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap super(handle); } + @Override + protected AbstractChannelHandlerContext findContext(AbstractChannelHandlerContext ctx) { + return ctx.findContextOutbound(MASK_WRITE | MASK_FLUSH); + } + @Override public void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) { super.write(ctx, msg, promise); @@ -1265,11 +1174,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap private final Runnable invokeChannelWritableStateChangedTask; private final Runnable invokeFlushTask; - Tasks(AbstractChannelHandlerContext next) { - invokeChannelReadCompleteTask = next::invokeChannelReadComplete; - invokeReadTask = next::invokeRead; - invokeChannelWritableStateChangedTask = next::invokeChannelWritabilityChanged; - invokeFlushTask = next::invokeFlush; + Tasks(AbstractChannelHandlerContext ctx) { + invokeChannelReadCompleteTask = ctx::findAndInvokeChannelReadComplete; + invokeReadTask = ctx::findAndInvokeRead; + invokeChannelWritableStateChangedTask = ctx::invokeChannelWritabilityChanged; + invokeFlushTask = ctx::findAndInvokeFlush; } } } diff --git a/transport/src/main/java/io/netty/channel/ChannelPipeline.java b/transport/src/main/java/io/netty/channel/ChannelPipeline.java index 8a8571af15..386c0c76ed 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/ChannelPipeline.java @@ -16,22 +16,19 @@ package io.netty.channel; import io.netty.buffer.ByteBuf; -import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutor; -import io.netty.util.concurrent.EventExecutorGroup; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.NoSuchElementException; /** * A list of {@link ChannelHandler}s which handles or intercepts inbound events and outbound operations of a - * {@link Channel}. {@link ChannelPipeline} implements an advanced form of the + * {@link Channel}. {@link ChannelPipeline} implements an advanced form of the * Intercepting Filter pattern * to give a user full control over how an event is handled and how the {@link ChannelHandler}s in a pipeline * interact with each other. @@ -179,7 +176,7 @@ import java.util.NoSuchElementException; *

Building a pipeline

*

* A user is supposed to have one or more {@link ChannelHandler}s in a pipeline to receive I/O events (e.g. read) and - * to request I/O operations (e.g. write and close). For example, a typical server will have the following handlers + * to request I/O operations (e.g. write and close). For example, a typical server will have the following handlers * in each channel's pipeline, but your mileage may vary depending on the complexity and characteristics of the * protocol and business logic: * @@ -192,7 +189,6 @@ import java.util.NoSuchElementException; * and it could be represented as shown in the following example: * *

- * static final {@link EventExecutorGroup} group = new {@link DefaultEventExecutorGroup}(16);
  * ...
  *
  * {@link ChannelPipeline} pipeline = ch.pipeline();
@@ -200,12 +196,9 @@ import java.util.NoSuchElementException;
  * pipeline.addLast("decoder", new MyProtocolDecoder());
  * pipeline.addLast("encoder", new MyProtocolEncoder());
  *
- * // Tell the pipeline to run MyBusinessLogicHandler's event handler methods
- * // in a different thread than an I/O thread so that the I/O thread is not blocked by
- * // a time-consuming task.
- * // If your business logic is fully asynchronous or finished very quickly, you don't
- * // need to specify a group.
- * pipeline.addLast(group.next(), "handler", new MyBusinessLogicHandler());
+ * // If your business logic does block or take a lot of time you should offload the work to an extra
+ * // {@link java.util.concurrent.Executor} to ensure you don't block the {@link EventLoop}.
+ * pipeline.addLast("handler",  new MyBusinessLogicHandler());
  * 
* *

Thread safety

@@ -215,7 +208,7 @@ import java.util.NoSuchElementException; * after the exchange. */ public interface ChannelPipeline - extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable> { + extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable> { /** * Inserts a {@link ChannelHandler} at the first position of this pipeline. @@ -230,21 +223,6 @@ public interface ChannelPipeline */ ChannelPipeline addFirst(String name, ChannelHandler handler); - /** - * Inserts a {@link ChannelHandler} at the first position of this pipeline. - * - * @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler} - * methods - * @param name the name of the handler to insert first - * @param handler the handler to insert first - * - * @throws IllegalArgumentException - * if there's an entry with the same name already in the pipeline - * @throws NullPointerException - * if the specified handler is {@code null} - */ - ChannelPipeline addFirst(EventExecutor executor, String name, ChannelHandler handler); - /** * Appends a {@link ChannelHandler} at the last position of this pipeline. * @@ -258,21 +236,6 @@ public interface ChannelPipeline */ ChannelPipeline addLast(String name, ChannelHandler handler); - /** - * Appends a {@link ChannelHandler} at the last position of this pipeline. - * - * @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler} - * methods - * @param name the name of the handler to append - * @param handler the handler to append - * - * @throws IllegalArgumentException - * if there's an entry with the same name already in the pipeline - * @throws NullPointerException - * if the specified handler is {@code null} - */ - ChannelPipeline addLast(EventExecutor executor, String name, ChannelHandler handler); - /** * Inserts a {@link ChannelHandler} before an existing handler of this * pipeline. @@ -290,25 +253,6 @@ public interface ChannelPipeline */ ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler); - /** - * Inserts a {@link ChannelHandler} before an existing handler of this - * pipeline. - * - * @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler} - * methods - * @param baseName the name of the existing handler - * @param name the name of the handler to insert before - * @param handler the handler to insert before - * - * @throws NoSuchElementException - * if there's no such entry with the specified {@code baseName} - * @throws IllegalArgumentException - * if there's an entry with the same name already in the pipeline - * @throws NullPointerException - * if the specified baseName or handler is {@code null} - */ - ChannelPipeline addBefore(EventExecutor executor, String baseName, String name, ChannelHandler handler); - /** * Inserts a {@link ChannelHandler} after an existing handler of this * pipeline. @@ -326,25 +270,6 @@ public interface ChannelPipeline */ ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler); - /** - * Inserts a {@link ChannelHandler} after an existing handler of this - * pipeline. - * - * @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler} - * methods - * @param baseName the name of the existing handler - * @param name the name of the handler to insert after - * @param handler the handler to insert after - * - * @throws NoSuchElementException - * if there's no such entry with the specified {@code baseName} - * @throws IllegalArgumentException - * if there's an entry with the same name already in the pipeline - * @throws NullPointerException - * if the specified baseName or handler is {@code null} - */ - ChannelPipeline addAfter(EventExecutor executor, String baseName, String name, ChannelHandler handler); - /** * Inserts {@link ChannelHandler}s at the first position of this pipeline. * @@ -353,16 +278,6 @@ public interface ChannelPipeline */ ChannelPipeline addFirst(ChannelHandler... handlers); - /** - * Inserts {@link ChannelHandler}s at the first position of this pipeline. - * - * @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler}s - * methods. - * @param handlers the handlers to insert first - * - */ - ChannelPipeline addFirst(EventExecutor executor, ChannelHandler... handlers); - /** * Inserts {@link ChannelHandler}s at the last position of this pipeline. * @@ -371,16 +286,6 @@ public interface ChannelPipeline */ ChannelPipeline addLast(ChannelHandler... handlers); - /** - * Inserts {@link ChannelHandler}s at the last position of this pipeline. - * - * @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler}s - * methods. - * @param handlers the handlers to insert last - * - */ - ChannelPipeline addLast(EventExecutor executor, ChannelHandler... handlers); - /** * Removes the specified {@link ChannelHandler} from this pipeline. * @@ -624,4 +529,9 @@ public interface ChannelPipeline @Override ChannelPipeline flush(); + + /** + * Returns the {@link EventExecutor} which is used by all {@link ChannelHandler}s in the pipeline. + */ + EventExecutor executor(); } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index 3a3fe3412b..14c41b7acb 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -23,8 +23,8 @@ final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext { private final ChannelHandler handler; DefaultChannelHandlerContext( - DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { - super(pipeline, executor, name, ObjectUtil.checkNotNull(handler, "handler").getClass()); + DefaultChannelPipeline pipeline, String name, ChannelHandler handler) { + super(pipeline, name, ObjectUtil.checkNotNull(handler, "handler").getClass()); this.handler = handler; } @@ -32,4 +32,9 @@ final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext { public ChannelHandler handler() { return handler; } + + @Override + public EventExecutor executor() { + return pipeline().executor(); + } } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 2e9c101ea4..f1daeaca06 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -28,6 +28,7 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import java.net.SocketAddress; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -35,6 +36,8 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.WeakHashMap; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.IntSupplier; +import java.util.function.Predicate; /** * The default {@link ChannelPipeline} implementation. It is usually created @@ -65,12 +68,13 @@ public class DefaultChannelPipeline implements ChannelPipeline { private final ChannelFuture succeededFuture; private final VoidChannelPromise voidPromise; private final boolean touch = ResourceLeakDetector.isEnabled(); + private final List handlers = new ArrayList<>(4); private volatile MessageSizeEstimator.Handle estimatorHandle; - protected DefaultChannelPipeline(Channel channel) { + public DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); - succeededFuture = new SucceededChannelFuture(channel, null); + succeededFuture = new SucceededChannelFuture(channel, channel.eventLoop()); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); @@ -95,8 +99,8 @@ public class DefaultChannelPipeline implements ChannelPipeline { return touch ? ReferenceCountUtil.touch(msg, next) : msg; } - private AbstractChannelHandlerContext newContext(EventExecutor executor, String name, ChannelHandler handler) { - return new DefaultChannelHandlerContext(this, executor, name, handler); + private AbstractChannelHandlerContext newContext(String name, ChannelHandler handler) { + return new DefaultChannelHandlerContext(this, name, handler); } @Override @@ -105,29 +109,37 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public final ChannelPipeline addFirst(String name, ChannelHandler handler) { - return addFirst(null, name, handler); + public final EventExecutor executor() { + return channel().eventLoop(); } @Override - public final ChannelPipeline addFirst(EventExecutor executor, String name, ChannelHandler handler) { - final AbstractChannelHandlerContext newCtx; - synchronized (this) { - checkMultiplicity(handler); - name = filterName(name, handler); + public final ChannelPipeline addFirst(String name, ChannelHandler handler) { + checkMultiplicity(handler); + if (name == null) { + name = generateName(handler); + } - newCtx = newContext(executor, name, handler); - - addFirst0(newCtx); - - EventExecutor ctxExecutor = newCtx.executor(); - if (!ctxExecutor.inEventLoop()) { - newCtx.setAddPending(); - ctxExecutor.execute(() -> callHandlerAdded0(newCtx)); - return this; + AbstractChannelHandlerContext newCtx = newContext(name, handler); + EventExecutor executor = executor(); + boolean inEventLoop = executor.inEventLoop(); + synchronized (handlers) { + if (context(name) != null) { + throw new IllegalArgumentException("Duplicate handler name: " + name); + } + handlers.add(0, newCtx); + if (!inEventLoop) { + try { + executor.execute(() -> addFirst0(newCtx)); + return this; + } catch (Throwable cause) { + handlers.remove(0); + throw cause; + } } } - callHandlerAdded0(newCtx); + + addFirst0(newCtx); return this; } @@ -137,31 +149,36 @@ public class DefaultChannelPipeline implements ChannelPipeline { newCtx.next = nextCtx; head.next = newCtx; nextCtx.prev = newCtx; + callHandlerAdded0(newCtx); } @Override public final ChannelPipeline addLast(String name, ChannelHandler handler) { - return addLast(null, name, handler); - } + checkMultiplicity(handler); + if (name == null) { + name = generateName(handler); + } - @Override - public final ChannelPipeline addLast(EventExecutor executor, String name, ChannelHandler handler) { - final AbstractChannelHandlerContext newCtx; + AbstractChannelHandlerContext newCtx = newContext(name, handler); + EventExecutor executor = executor(); + boolean inEventLoop = executor.inEventLoop(); synchronized (this) { - checkMultiplicity(handler); - - newCtx = newContext(executor, filterName(name, handler), handler); - - addLast0(newCtx); - - EventExecutor ctxExecutor = newCtx.executor(); - if (!ctxExecutor.inEventLoop()) { - newCtx.setAddPending(); - ctxExecutor.execute(() -> callHandlerAdded0(newCtx)); - return this; + if (context(name) != null) { + throw new IllegalArgumentException("Duplicate handler name: " + name); + } + handlers.add(newCtx); + if (!inEventLoop) { + try { + executor.execute(() -> addLast0(newCtx)); + return this; + } catch (Throwable cause) { + handlers.remove(handlers.size() - 1); + throw cause; + } } } - callHandlerAdded0(newCtx); + + addLast0(newCtx); return this; } @@ -171,89 +188,101 @@ public class DefaultChannelPipeline implements ChannelPipeline { newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; + callHandlerAdded0(newCtx); } @Override public final ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) { - return addBefore(null, baseName, name, handler); - } - - @Override - public final ChannelPipeline addBefore( - EventExecutor executor, String baseName, String name, ChannelHandler handler) { - final AbstractChannelHandlerContext newCtx; final AbstractChannelHandlerContext ctx; - synchronized (this) { - checkMultiplicity(handler); - name = filterName(name, handler); - ctx = getContextOrDie(baseName); - newCtx = newContext(executor, name, handler); + checkMultiplicity(handler); + if (name == null) { + name = generateName(handler); + } - addBefore0(ctx, newCtx); + AbstractChannelHandlerContext newCtx = newContext(name, handler); + EventExecutor executor = executor(); + boolean inEventLoop = executor.inEventLoop(); + synchronized (handlers) { + int i = findCtxIdx(context -> context.name().equals(baseName)); - EventExecutor ctxExecutor = newCtx.executor(); - if (!ctxExecutor.inEventLoop()) { - newCtx.setAddPending(); - ctxExecutor.execute(() -> callHandlerAdded0(newCtx)); - return this; + if (i == -1) { + throw new NoSuchElementException(baseName); + } + + if (context(name) != null) { + throw new IllegalArgumentException("Duplicate handler name: " + name); + } + ctx = handlers.get(i); + handlers.add(i, newCtx); + if (!inEventLoop) { + try { + executor.execute(() -> addBefore0(ctx, newCtx)); + return this; + } catch (Throwable cause) { + handlers.remove(i); + throw cause; + } } } - callHandlerAdded0(newCtx); + + addBefore0(ctx, newCtx); return this; } - private static void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) { + private void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) { newCtx.prev = ctx.prev; newCtx.next = ctx; ctx.prev.next = newCtx; ctx.prev = newCtx; - } - - private String filterName(String name, ChannelHandler handler) { - if (name == null) { - return generateName(handler); - } - checkDuplicateName(name); - return name; + callHandlerAdded0(newCtx); } @Override public final ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) { - return addAfter(null, baseName, name, handler); - } - - @Override - public final ChannelPipeline addAfter( - EventExecutor executor, String baseName, String name, ChannelHandler handler) { - final AbstractChannelHandlerContext newCtx; final AbstractChannelHandlerContext ctx; - synchronized (this) { - checkMultiplicity(handler); - name = filterName(name, handler); - ctx = getContextOrDie(baseName); + checkMultiplicity(handler); + if (name == null) { + name = generateName(handler); + } - newCtx = newContext(executor, name, handler); + AbstractChannelHandlerContext newCtx = newContext(name, handler); + EventExecutor executor = executor(); + boolean inEventLoop = executor.inEventLoop(); + synchronized (handlers) { + int i = findCtxIdx(context -> context.name().equals(baseName)); - addAfter0(ctx, newCtx); + if (i == -1) { + throw new NoSuchElementException(baseName); + } - EventExecutor ctxExecutor = newCtx.executor(); - if (!ctxExecutor.inEventLoop()) { - newCtx.setAddPending(); - ctxExecutor.execute(() -> callHandlerAdded0(newCtx)); - return this; + if (context(name) != null) { + throw new IllegalArgumentException("Duplicate handler name: " + name); + } + ctx = handlers.get(i); + handlers.add(i + 1, newCtx); + if (!inEventLoop) { + try { + executor.execute(() -> addAfter0(ctx, newCtx)); + return this; + } catch (Throwable cause) { + handlers.remove(i + 1); + throw cause; + } } } - callHandlerAdded0(newCtx); + + addAfter0(ctx, newCtx); return this; } - private static void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) { + private void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) { newCtx.prev = ctx; newCtx.next = ctx.next; ctx.next.prev = newCtx; ctx.next = newCtx; + callHandlerAdded0(newCtx); } public final ChannelPipeline addFirst(ChannelHandler handler) { @@ -262,11 +291,6 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public final ChannelPipeline addFirst(ChannelHandler... handlers) { - return addFirst(null, handlers); - } - - @Override - public final ChannelPipeline addFirst(EventExecutor executor, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } @@ -283,7 +307,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { for (int i = size - 1; i >= 0; i --) { ChannelHandler h = handlers[i]; - addFirst(executor, null, h); + addFirst(null, h); } return this; @@ -295,11 +319,6 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public final ChannelPipeline addLast(ChannelHandler... handlers) { - return addLast(null, handlers); - } - - @Override - public final ChannelPipeline addLast(EventExecutor executor, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } @@ -308,7 +327,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { if (h == null) { break; } - addLast(executor, null, h); + addLast(null, h); } return this; @@ -323,18 +342,21 @@ public class DefaultChannelPipeline implements ChannelPipeline { cache.put(handlerType, name); } - // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid - // any name conflicts. Note that we don't cache the names generated here. - if (context0(name) != null) { - String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'. - for (int i = 1;; i ++) { - String newName = baseName + i; - if (context0(newName) == null) { - name = newName; - break; + synchronized (handlers) { + // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid + // any name conflicts. Note that we don't cache the names generated here. + if (context(name) != null) { + String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'. + for (int i = 1;; i ++) { + String newName = baseName + i; + if (context(newName) == null) { + name = newName; + break; + } } } } + return name; } @@ -342,141 +364,214 @@ public class DefaultChannelPipeline implements ChannelPipeline { return StringUtil.simpleClassName(handlerType) + "#0"; } + private int findCtxIdx(Predicate predicate) { + for (int i = 0; i < handlers.size(); i++) { + if (predicate.test(handlers.get(i))) { + return i; + } + } + return -1; + } + @Override public final ChannelPipeline remove(ChannelHandler handler) { - remove(getContextOrDie(handler)); + final AbstractChannelHandlerContext ctx; + EventExecutor executor = executor(); + boolean inEventLoop = executor.inEventLoop(); + synchronized (handlers) { + int idx = findCtxIdx(context -> context.handler() == handler); + if (idx == -1) { + throw new NoSuchElementException(); + } + ctx = handlers.remove(idx); + assert ctx != null; + + if (!inEventLoop) { + try { + executor.execute(() -> remove0(ctx)); + return this; + } catch (Throwable cause) { + handlers.add(idx, ctx); + throw cause; + } + } + } + + remove0(ctx); return this; } @Override public final ChannelHandler remove(String name) { - return remove(getContextOrDie(name)).handler(); + final AbstractChannelHandlerContext ctx; + EventExecutor executor = executor(); + boolean inEventLoop = executor.inEventLoop(); + synchronized (handlers) { + int idx = findCtxIdx(context -> context.name().equals(name)); + if (idx == -1) { + throw new NoSuchElementException(); + } + ctx = handlers.remove(idx); + assert ctx != null; + + if (!inEventLoop) { + try { + executor.execute(() -> remove0(ctx)); + return ctx.handler(); + } catch (Throwable cause) { + handlers.add(idx, ctx); + throw cause; + } + } + } + + remove0(ctx); + return ctx.handler(); } @SuppressWarnings("unchecked") @Override public final T remove(Class handlerType) { - return (T) remove(getContextOrDie(handlerType)).handler(); + final AbstractChannelHandlerContext ctx; + EventExecutor executor = executor(); + boolean inEventLoop = executor.inEventLoop(); + synchronized (handlers) { + int idx = findCtxIdx(context -> handlerType.isAssignableFrom(context.handler().getClass())); + if (idx == -1) { + throw new NoSuchElementException(); + } + ctx = handlers.remove(idx); + assert ctx != null; + + if (!inEventLoop) { + try { + executor.execute(() -> remove0(ctx)); + return (T) ctx.handler(); + } catch (Throwable cause) { + handlers.add(idx, ctx); + throw cause; + } + } + } + + remove0(ctx); + return (T) ctx.handler(); } public final T removeIfExists(String name) { - return removeIfExists(context(name)); + return removeIfExists(() -> findCtxIdx(context -> name.equals(context.name()))); } public final T removeIfExists(Class handlerType) { - return removeIfExists(context(handlerType)); + return removeIfExists(() -> findCtxIdx( + context -> handlerType.isAssignableFrom(context.handler().getClass()))); } public final T removeIfExists(ChannelHandler handler) { - return removeIfExists(context(handler)); + return removeIfExists(() -> findCtxIdx(context -> handler == context.handler())); } @SuppressWarnings("unchecked") - private T removeIfExists(ChannelHandlerContext ctx) { - if (ctx == null) { - return null; - } - return (T) remove((AbstractChannelHandlerContext) ctx).handler(); - } + private T removeIfExists(IntSupplier idxSupplier) { + final AbstractChannelHandlerContext ctx; + EventExecutor executor = executor(); + boolean inEventLoop = executor.inEventLoop(); + synchronized (handlers) { + int idx = idxSupplier.getAsInt(); + if (idx == -1) { + return null; + } + ctx = handlers.remove(idx); + assert ctx != null; - private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) { - assert ctx != head && ctx != tail; - - synchronized (this) { - remove0(ctx); - - EventExecutor executor = ctx.executor(); - if (!executor.inEventLoop()) { - executor.execute(() -> callHandlerRemoved0(ctx)); - return ctx; + if (!inEventLoop) { + try { + executor.execute(() -> remove0(ctx)); + return (T) ctx.handler(); + } catch (Throwable cause) { + handlers.add(idx, ctx); + throw cause; + } } } - callHandlerRemoved0(ctx); - return ctx; + remove0(ctx); + return (T) ctx.handler(); } - private static void remove0(AbstractChannelHandlerContext ctx) { + private void unlink(AbstractChannelHandlerContext ctx) { + assert ctx != head && ctx != tail; AbstractChannelHandlerContext prev = ctx.prev; AbstractChannelHandlerContext next = ctx.next; prev.next = next; next.prev = prev; } - @Override - public final ChannelHandler removeFirst() { - if (head.next == tail) { - throw new NoSuchElementException(); - } - return remove(head.next).handler(); - } - - @Override - public final ChannelHandler removeLast() { - if (head.next == tail) { - throw new NoSuchElementException(); - } - return remove(tail.prev).handler(); + private void remove0(AbstractChannelHandlerContext ctx) { + unlink(ctx); + callHandlerRemoved0(ctx); } @Override public final ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) { - replace(getContextOrDie(oldHandler), newName, newHandler); + replace(ctx -> ctx.handler() == oldHandler, newName, newHandler); return this; } @Override public final ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) { - return replace(getContextOrDie(oldName), newName, newHandler); + return replace(ctx -> ctx.name().equals(oldName), newName, newHandler); } @Override @SuppressWarnings("unchecked") public final T replace( Class oldHandlerType, String newName, ChannelHandler newHandler) { - return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler); + return (T) replace(ctx -> oldHandlerType.isAssignableFrom(ctx.handler().getClass()), newName, newHandler); } private ChannelHandler replace( - final AbstractChannelHandlerContext ctx, String newName, ChannelHandler newHandler) { - assert ctx != head && ctx != tail; + Predicate predicate, String newName, ChannelHandler newHandler) { + checkMultiplicity(newHandler); - final AbstractChannelHandlerContext newCtx; - synchronized (this) { - checkMultiplicity(newHandler); - if (newName == null) { - newName = generateName(newHandler); - } else { - boolean sameName = ctx.name().equals(newName); - if (!sameName) { - checkDuplicateName(newName); + if (newName == null) { + newName = generateName(newHandler); + } + AbstractChannelHandlerContext oldCtx; + AbstractChannelHandlerContext newCtx = newContext(newName, newHandler); + EventExecutor executor = executor(); + boolean inEventLoop = executor.inEventLoop(); + synchronized (handlers) { + int idx = findCtxIdx(predicate); + if (idx == -1) { + throw new NoSuchElementException(); + } + oldCtx = handlers.get(idx); + assert oldCtx != head && oldCtx != tail && oldCtx != null; + + if (!oldCtx.name().equals(newName)) { + if (context(newName) != null) { + throw new IllegalArgumentException("Duplicate handler name: " + newName); } } + AbstractChannelHandlerContext removed = handlers.set(idx, newCtx); + assert removed != null; - newCtx = newContext(ctx.executor, newName, newHandler); - - replace0(ctx, newCtx); - - EventExecutor executor = ctx.executor(); - if (!executor.inEventLoop()) { - executor.execute(() -> { - // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked) - // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and - // those event handlers must be called after handlerAdded(). - callHandlerAdded0(newCtx); - callHandlerRemoved0(ctx); - }); - return ctx.handler(); + if (!inEventLoop) { + try { + executor.execute(() -> replace0(oldCtx, newCtx)); + return oldCtx.handler(); + } catch (Throwable cause) { + handlers.set(idx, oldCtx); + throw cause; + } } } - // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked) - // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those - // event handlers must be called after handlerAdded(). - callHandlerAdded0(newCtx); - callHandlerRemoved0(ctx); - return ctx.handler(); + + replace0(oldCtx, newCtx); + return oldCtx.handler(); } - private static void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) { + private void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = oldCtx.prev; AbstractChannelHandlerContext next = oldCtx.next; newCtx.prev = prev; @@ -492,6 +587,12 @@ public class DefaultChannelPipeline implements ChannelPipeline { // update the reference to the replacement so forward of buffered content will work correctly oldCtx.prev = newCtx; oldCtx.next = newCtx; + + // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked) + // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those + // event handlers must be called after handlerAdded(). + callHandlerAdded0(newCtx); + callHandlerRemoved0(oldCtx); } private static void checkMultiplicity(ChannelHandler handler) { @@ -512,8 +613,13 @@ public class DefaultChannelPipeline implements ChannelPipeline { } catch (Throwable t) { boolean removed = false; try { - remove0(ctx); + synchronized (handlers) { + handlers.remove(ctx); + } + + unlink(ctx); ctx.callHandlerRemoved(); + removed = true; } catch (Throwable t2) { if (logger.isWarnEnabled()) { @@ -543,61 +649,27 @@ public class DefaultChannelPipeline implements ChannelPipeline { } } - @Override - public final ChannelHandler first() { - ChannelHandlerContext first = firstContext(); - if (first == null) { - return null; - } - return first.handler(); - } - - @Override - public final ChannelHandlerContext firstContext() { - AbstractChannelHandlerContext first = head.next; - if (first == tail) { - return null; - } - return head.next; - } - - @Override - public final ChannelHandler last() { - AbstractChannelHandlerContext last = tail.prev; - if (last == head) { - return null; - } - return last.handler(); - } - - @Override - public final ChannelHandlerContext lastContext() { - AbstractChannelHandlerContext last = tail.prev; - if (last == head) { - return null; - } - return last; - } - @Override public final ChannelHandler get(String name) { ChannelHandlerContext ctx = context(name); - if (ctx == null) { - return null; - } else { - return ctx.handler(); - } + return ctx == null ? null : ctx.handler(); } @SuppressWarnings("unchecked") @Override public final T get(Class handlerType) { ChannelHandlerContext ctx = context(handlerType); - if (ctx == null) { - return null; - } else { - return (T) ctx.handler(); + return ctx == null ? null : (T) ctx.handler(); + } + + private AbstractChannelHandlerContext findCtx(Predicate predicate) { + for (int i = 0; i < handlers.size(); i++) { + AbstractChannelHandlerContext ctx = handlers.get(i); + if (predicate.test(ctx)) { + return ctx; + } } + return null; } @Override @@ -605,8 +677,9 @@ public class DefaultChannelPipeline implements ChannelPipeline { if (name == null) { throw new NullPointerException("name"); } - - return context0(name); + synchronized (handlers) { + return findCtx(ctx -> ctx.name().equals(name)); + } } @Override @@ -615,18 +688,8 @@ public class DefaultChannelPipeline implements ChannelPipeline { throw new NullPointerException("handler"); } - AbstractChannelHandlerContext ctx = head.next; - for (;;) { - - if (ctx == null) { - return null; - } - - if (ctx.handler() == handler) { - return ctx; - } - - ctx = ctx.next; + synchronized (handlers) { + return findCtx(ctx -> ctx.handler() == handler); } } @@ -636,49 +699,22 @@ public class DefaultChannelPipeline implements ChannelPipeline { throw new NullPointerException("handlerType"); } - AbstractChannelHandlerContext ctx = head.next; - for (;;) { - if (ctx == null) { - return null; - } - if (handlerType.isAssignableFrom(ctx.handler().getClass())) { - return ctx; - } - ctx = ctx.next; + synchronized (handlers) { + return findCtx(ctx -> handlerType.isAssignableFrom(ctx.handler().getClass())); } } @Override public final List names() { - List list = new ArrayList<>(); - AbstractChannelHandlerContext ctx = head.next; - for (;;) { - if (ctx == null) { - return list; + synchronized (handlers) { + List names = new ArrayList<>(handlers.size()); + for (int i = 0; i < handlers.size(); i++) { + names.add(handlers.get(i).name()); } - list.add(ctx.name()); - ctx = ctx.next; + return names; } } - @Override - public final Map toMap() { - Map map = new LinkedHashMap<>(); - AbstractChannelHandlerContext ctx = head.next; - for (;;) { - if (ctx == tail) { - return map; - } - map.put(ctx.name(), ctx.handler()); - ctx = ctx.next; - } - } - - @Override - public final Iterator> iterator() { - return toMap().entrySet().iterator(); - } - /** * Returns the {@link String} representation of this pipeline. */ @@ -687,140 +723,168 @@ public class DefaultChannelPipeline implements ChannelPipeline { StringBuilder buf = new StringBuilder() .append(StringUtil.simpleClassName(this)) .append('{'); - AbstractChannelHandlerContext ctx = head.next; - for (;;) { - if (ctx == tail) { - break; + synchronized (handlers) { + if (!handlers.isEmpty()) { + for (int i = 0; i < handlers.size(); i++) { + AbstractChannelHandlerContext ctx = handlers.get(i); + + buf.append('(') + .append(ctx.name()) + .append(" = ") + .append(ctx.handler().getClass().getName()) + .append("), "); + } + buf.setLength(buf.length() - 2); } - - buf.append('(') - .append(ctx.name()) - .append(" = ") - .append(ctx.handler().getClass().getName()) - .append(')'); - - ctx = ctx.next; - if (ctx == tail) { - break; - } - - buf.append(", "); } buf.append('}'); return buf.toString(); } + @Override + public ChannelHandler removeFirst() { + synchronized (handlers) { + if (handlers.isEmpty()) { + throw new NoSuchElementException(); + } + return handlers.remove(0).handler(); + } + } + + @Override + public ChannelHandler removeLast() { + synchronized (handlers) { + if (handlers.isEmpty()) { + throw new NoSuchElementException(); + } + return handlers.remove(handlers.size() - 1).handler(); + } + } + + @Override + public ChannelHandler first() { + ChannelHandlerContext ctx = firstContext(); + return ctx == null ? null : ctx.handler(); + } + + @Override + public ChannelHandlerContext firstContext() { + synchronized (handlers) { + return handlers.isEmpty() ? null : handlers.get(0); + } + } + + @Override + public ChannelHandler last() { + ChannelHandlerContext ctx = lastContext(); + return ctx == null ? null : ctx.handler(); + } + + @Override + public ChannelHandlerContext lastContext() { + synchronized (handlers) { + return handlers.isEmpty() ? null : handlers.get(handlers.size() - 1); + } + } + + @Override + public Map toMap() { + Map map; + synchronized (handlers) { + if (handlers.isEmpty()) { + return Collections.emptyMap(); + } + map = new LinkedHashMap<>(handlers.size()); + for (int i = 0; i < handlers.size(); i++) { + ChannelHandlerContext ctx = handlers.get(i); + map.put(ctx.name(), ctx.handler()); + } + return map; + } + } + + @Override + public Iterator> iterator() { + return toMap().entrySet().iterator(); + } + @Override public final ChannelPipeline fireChannelRegistered() { - AbstractChannelHandlerContext.invokeChannelRegistered(head); + head.invokeChannelRegistered(); return this; } @Override public final ChannelPipeline fireChannelUnregistered() { - AbstractChannelHandlerContext.invokeChannelUnregistered(head); + head.invokeChannelUnregistered(); return this; } /** * Removes all handlers from the pipeline one by one from tail (exclusive) to head (exclusive) to trigger * handlerRemoved(). - * - * Note that we traverse up the pipeline ({@link #destroyUp(AbstractChannelHandlerContext, boolean)}) - * before traversing down ({@link #destroyDown(Thread, AbstractChannelHandlerContext, boolean)}) so that - * the handlers are removed after all events are handled. - * - * See: https://github.com/netty/netty/issues/3156 */ - private synchronized void destroy() { - destroyUp(head.next, false); - } - - private void destroyUp(AbstractChannelHandlerContext ctx, boolean inEventLoop) { - final Thread currentThread = Thread.currentThread(); - final AbstractChannelHandlerContext tail = this.tail; - for (;;) { - if (ctx == tail) { - destroyDown(currentThread, tail.prev, inEventLoop); - break; - } - - final EventExecutor executor = ctx.executor(); - if (!inEventLoop && !executor.inEventLoop(currentThread)) { - final AbstractChannelHandlerContext finalCtx = ctx; - executor.execute(() -> destroyUp(finalCtx, true)); - break; - } - - ctx = ctx.next; - inEventLoop = false; + private void destroy() { + EventExecutor executor = executor(); + if (executor.inEventLoop()) { + destroy0(); + } else { + executor.execute(this::destroy0); } } - private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx, boolean inEventLoop) { - // We have reached at tail; now traverse backwards. - final AbstractChannelHandlerContext head = this.head; - for (;;) { - if (ctx == head) { - break; - } - - final EventExecutor executor = ctx.executor(); - if (inEventLoop || executor.inEventLoop(currentThread)) { - synchronized (this) { - remove0(ctx); - } - callHandlerRemoved0(ctx); - } else { - final AbstractChannelHandlerContext finalCtx = ctx; - executor.execute(() -> destroyDown(Thread.currentThread(), finalCtx, true)); - break; + private void destroy0() { + assert executor().inEventLoop(); + AbstractChannelHandlerContext ctx = this.tail.prev; + while (ctx != head) { + synchronized (handlers) { + handlers.remove(ctx); } + remove0(ctx); ctx = ctx.prev; - inEventLoop = false; } } @Override public final ChannelPipeline fireChannelActive() { - AbstractChannelHandlerContext.invokeChannelActive(head); + head.invokeChannelActive(); return this; } @Override public final ChannelPipeline fireChannelInactive() { - AbstractChannelHandlerContext.invokeChannelInactive(head); + head.invokeChannelInactive(); return this; } @Override public final ChannelPipeline fireExceptionCaught(Throwable cause) { - AbstractChannelHandlerContext.invokeExceptionCaught(head, cause); + head.invokeExceptionCaught(cause); return this; } @Override public final ChannelPipeline fireUserEventTriggered(Object event) { - AbstractChannelHandlerContext.invokeUserEventTriggered(head, event); + head.invokeUserEventTriggered(event); return this; } @Override public final ChannelPipeline fireChannelRead(Object msg) { - AbstractChannelHandlerContext.invokeChannelRead(head, msg); + head.invokeChannelRead(msg); return this; } @Override public final ChannelPipeline fireChannelReadComplete() { - AbstractChannelHandlerContext.invokeChannelReadComplete(head); + head.invokeChannelReadComplete(); return this; } @Override public final ChannelPipeline fireChannelWritabilityChanged() { - AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head); + head.invokeChannelWritabilityChanged(); return this; } @@ -929,12 +993,12 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public final ChannelPromise newPromise() { - return new DefaultChannelPromise(channel); + return new DefaultChannelPromise(channel(), executor()); } @Override public final ChannelProgressivePromise newProgressivePromise() { - return new DefaultChannelProgressivePromise(channel); + return new DefaultChannelProgressivePromise(channel(), executor()); } @Override @@ -944,7 +1008,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public final ChannelFuture newFailedFuture(Throwable cause) { - return new FailedChannelFuture(channel, null, cause); + return new FailedChannelFuture(channel(), executor(), cause); } @Override @@ -952,50 +1016,6 @@ public class DefaultChannelPipeline implements ChannelPipeline { return voidPromise; } - private void checkDuplicateName(String name) { - if (context0(name) != null) { - throw new IllegalArgumentException("Duplicate handler name: " + name); - } - } - - private AbstractChannelHandlerContext context0(String name) { - AbstractChannelHandlerContext context = head.next; - while (context != tail) { - if (context.name().equals(name)) { - return context; - } - context = context.next; - } - return null; - } - - private AbstractChannelHandlerContext getContextOrDie(String name) { - AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name); - if (ctx == null) { - throw new NoSuchElementException(name); - } else { - return ctx; - } - } - - private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) { - AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler); - if (ctx == null) { - throw new NoSuchElementException(handler.getClass().getName()); - } else { - return ctx; - } - } - - private AbstractChannelHandlerContext getContextOrDie(Class handlerType) { - AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handlerType); - if (ctx == null) { - throw new NoSuchElementException(handlerType.getName()); - } else { - return ctx; - } - } - /** * Called once a {@link Throwable} hit the end of the {@link ChannelPipeline} without been handled by the user * in {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}. @@ -1085,10 +1105,15 @@ public class DefaultChannelPipeline implements ChannelPipeline { final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { TailContext(DefaultChannelPipeline pipeline) { - super(pipeline, null, TAIL_NAME, TailContext.class); + super(pipeline, TAIL_NAME, TailContext.class); setAddComplete(); } + @Override + public EventExecutor executor() { + return pipeline().executor(); + } + @Override public ChannelHandler handler() { return this; @@ -1148,11 +1173,16 @@ public class DefaultChannelPipeline implements ChannelPipeline { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { - super(pipeline, null, HEAD_NAME, HeadContext.class); + super(pipeline, HEAD_NAME, HeadContext.class); unsafe = pipeline.channel().unsafe(); setAddComplete(); } + @Override + public EventExecutor executor() { + return pipeline().executor(); + } + @Override public ChannelHandler handler() { return this; diff --git a/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java b/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java index 69cc007bec..ef6fa7c9ce 100644 --- a/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java @@ -30,44 +30,6 @@ import static org.junit.Assert.*; public abstract class AbstractEventLoopTest { - /** - * Test for https://github.com/netty/netty/issues/803 - */ - @Test - public void testReregister() { - EventLoopGroup group = newEventLoopGroup(); - final EventExecutorGroup eventExecutorGroup = new DefaultEventExecutorGroup(2); - - try { - ServerBootstrap bootstrap = new ServerBootstrap(); - ChannelFuture future = bootstrap.channel(newChannel()).group(group) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - } - }).handler(new ChannelInitializer() { - @Override - public void initChannel(ServerSocketChannel ch) throws Exception { - ch.pipeline().addLast(new TestChannelHandler()); - ch.pipeline().addLast(eventExecutorGroup.next(), new TestChannelHandler2()); - } - }) - .bind(0).awaitUninterruptibly(); - - EventExecutor executor = future.channel().pipeline().context(TestChannelHandler2.class).executor(); - EventExecutor executor1 = future.channel().pipeline().context(TestChannelHandler.class).executor(); - - future.channel().deregister().awaitUninterruptibly(); - Channel channel = future.channel().register().awaitUninterruptibly().channel(); - EventExecutor executorNew = channel.pipeline().context(TestChannelHandler.class).executor(); - assertSame(executor1, executorNew); - assertSame(executor, future.channel().pipeline().context(TestChannelHandler2.class).executor()); - } finally { - group.shutdownGracefully(); - eventExecutorGroup.shutdownGracefully(); - } - } - @Test(timeout = 5000) public void testShutdownGracefullyNoQuietPeriod() throws Exception { EventLoopGroup loop = newEventLoopGroup(); @@ -86,13 +48,6 @@ public abstract class AbstractEventLoopTest { assertTrue(loop.isTerminated()); } - private static final class TestChannelHandler extends ChannelDuplexHandler { } - - private static final class TestChannelHandler2 extends ChannelDuplexHandler { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { } - } - protected abstract EventLoopGroup newEventLoopGroup(); protected abstract Class newChannel(); } diff --git a/transport/src/test/java/io/netty/channel/ChannelInitializerTest.java b/transport/src/test/java/io/netty/channel/ChannelInitializerTest.java index 07bf65e0e4..3360d4ed14 100644 --- a/transport/src/test/java/io/netty/channel/ChannelInitializerTest.java +++ b/transport/src/test/java/io/netty/channel/ChannelInitializerTest.java @@ -22,26 +22,14 @@ import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; import io.netty.channel.local.LocalHandler; import io.netty.channel.local.LocalServerChannel; -import io.netty.util.concurrent.AbstractEventExecutor; -import io.netty.util.concurrent.EventExecutor; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.ScheduledFuture; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.nio.channels.ClosedChannelException; -import java.util.Collection; import java.util.Iterator; -import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -49,7 +37,6 @@ import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; public class ChannelInitializerTest { @@ -258,186 +245,6 @@ public class ChannelInitializerTest { } } - @SuppressWarnings("deprecation") - @Test(timeout = 10000) - public void testChannelInitializerEventExecutor() throws Throwable { - final AtomicInteger invokeCount = new AtomicInteger(); - final AtomicInteger completeCount = new AtomicInteger(); - final AtomicReference errorRef = new AtomicReference<>(); - LocalAddress addr = new LocalAddress("test"); - - final EventExecutor executor = new AbstractEventExecutor() { - private final ScheduledExecutorService execService = Executors.newSingleThreadScheduledExecutor(); - - @Override - public boolean inEventLoop(Thread thread) { - return false; - } - - @Override - public boolean isShuttingDown() { - return execService.isShutdown(); - } - - @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { - shutdown(); - return newSucceededFuture(null); - } - - @Override - public Future terminationFuture() { - return newFailedFuture(new UnsupportedOperationException()); - } - - @Override - public void shutdown() { - execService.shutdown(); - } - - @Override - public List shutdownNow() { - return execService.shutdownNow(); - } - - @Override - public boolean isShutdown() { - return execService.isShutdown(); - } - - @Override - public boolean isTerminated() { - return execService.isTerminated(); - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - return execService.awaitTermination(timeout, unit); - } - - @Override - public List> invokeAll(Collection> tasks) - throws InterruptedException { - return execService.invokeAll(tasks); - } - - @Override - public List> invokeAll( - Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { - return execService.invokeAll(tasks, timeout, unit); - } - - @Override - public T invokeAny(Collection> tasks) - throws InterruptedException, ExecutionException { - return execService.invokeAny(tasks); - } - - @Override - public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - return execService.invokeAny(tasks, timeout, unit); - } - - @Override - public void execute(Runnable command) { - execService.execute(command); - } - - @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - throw new UnsupportedOperationException(); - } - - @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - throw new UnsupportedOperationException(); - } - - @Override - public ScheduledFuture scheduleAtFixedRate( - Runnable command, long initialDelay, long period, TimeUnit unit) { - throw new UnsupportedOperationException(); - } - - @Override - public ScheduledFuture scheduleWithFixedDelay( - Runnable command, long initialDelay, long delay, TimeUnit unit) { - throw new UnsupportedOperationException(); - } - }; - - final CountDownLatch latch = new CountDownLatch(1); - ServerBootstrap serverBootstrap = new ServerBootstrap() - .channel(LocalServerChannel.class) - .group(group) - .localAddress(addr) - .childHandler(new ChannelInitializer() { - @Override - protected void initChannel(LocalChannel ch) { - ch.pipeline().addLast(executor, new ChannelInitializer() { - @Override - protected void initChannel(Channel ch) { - invokeCount.incrementAndGet(); - ChannelHandlerContext ctx = ch.pipeline().context(this); - assertNotNull(ctx); - ch.pipeline().addAfter(ctx.executor(), - ctx.name(), null, new ChannelInboundHandlerAdapter() { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { - // just drop on the floor. - } - - @Override - public void handlerRemoved(ChannelHandlerContext ctx) { - latch.countDown(); - } - }); - completeCount.incrementAndGet(); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - if (cause instanceof AssertionError) { - errorRef.set(cause); - } - } - }); - } - }); - - Channel server = serverBootstrap.bind().sync().channel(); - - Bootstrap clientBootstrap = new Bootstrap() - .channel(LocalChannel.class) - .group(group) - .remoteAddress(addr) - .handler(new ChannelInboundHandlerAdapter()); - - Channel client = clientBootstrap.connect().sync().channel(); - client.writeAndFlush("Hello World").sync(); - - client.close().sync(); - server.close().sync(); - - client.closeFuture().sync(); - server.closeFuture().sync(); - - // Wait until the handler is removed from the pipeline and so no more events are handled by it. - latch.await(); - - assertEquals(1, invokeCount.get()); - assertEquals(invokeCount.get(), completeCount.get()); - - Throwable cause = errorRef.get(); - if (cause != null) { - throw cause; - } - - executor.shutdown(); - assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS)); - } - private static void closeChannel(Channel c) { if (c != null) { c.close().syncUninterruptibly(); diff --git a/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java b/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java index dfae3be5b2..c769cbf963 100644 --- a/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java +++ b/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java @@ -19,17 +19,11 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.util.CharsetUtil; -import io.netty.util.concurrent.DefaultThreadFactory; -import io.netty.util.concurrent.RejectedExecutionHandlers; -import io.netty.util.concurrent.SingleThreadEventExecutor; import org.junit.Test; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.util.Queue; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; import static io.netty.buffer.Unpooled.*; import static org.hamcrest.Matchers.*; @@ -388,87 +382,6 @@ public class ChannelOutboundBufferTest { safeClose(ch); } - @Test(timeout = 5000) - public void testWriteTaskRejected() throws Exception { - final SingleThreadEventExecutor executor = new SingleThreadEventExecutor( - new DefaultThreadFactory("executorPool"), - 1, RejectedExecutionHandlers.reject()) { - - @Override - protected Queue newTaskQueue(int maxPendingTasks) { - return super.newTaskQueue(1); - } - }; - final CountDownLatch handlerAddedLatch = new CountDownLatch(1); - final CountDownLatch handlerRemovedLatch = new CountDownLatch(1); - EmbeddedChannel ch = new EmbeddedChannel(); - ch.pipeline().addLast(executor, "handler", new ChannelOutboundHandlerAdapter() { - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - promise.setFailure(new AssertionError("Should not be called")); - } - - @Override - public void handlerAdded(ChannelHandlerContext ctx) { - handlerAddedLatch.countDown(); - } - - @Override - public void handlerRemoved(ChannelHandlerContext ctx) { - handlerRemovedLatch.countDown(); - } - }); - - // Lets wait until we are sure the handler was added. - handlerAddedLatch.await(); - - final CountDownLatch executeLatch = new CountDownLatch(1); - final CountDownLatch runLatch = new CountDownLatch(1); - executor.execute(() -> { - try { - runLatch.countDown(); - executeLatch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - }); - - runLatch.await(); - - executor.execute(() -> { - // Will not be executed but ensure the pending count is 1. - }); - - assertEquals(1, executor.pendingTasks()); - assertEquals(0, ch.unsafe().outboundBuffer().totalPendingWriteBytes()); - - ByteBuf buffer = buffer(128).writeZero(128); - ChannelFuture future = ch.write(buffer); - ch.runPendingTasks(); - - assertTrue(future.cause() instanceof RejectedExecutionException); - assertEquals(0, buffer.refCnt()); - - // In case of rejected task we should not have anything pending. - assertEquals(0, ch.unsafe().outboundBuffer().totalPendingWriteBytes()); - executeLatch.countDown(); - - while (executor.pendingTasks() != 0) { - // Wait until there is no more pending task left. - Thread.sleep(10); - } - - ch.pipeline().remove("handler"); - - // Ensure we do not try to shutdown the executor before we handled everything for the Channel. Otherwise - // the Executor may reject when the Channel tries to add a task to it. - handlerRemovedLatch.await(); - - safeClose(ch); - - executor.shutdownGracefully(); - } - private static void safeClose(EmbeddedChannel ch) { ch.finish(); for (;;) { diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index 301a673bfa..eb53fa7439 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -32,14 +32,11 @@ import io.netty.util.AbstractReferenceCounted; import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; import io.netty.util.concurrent.AbstractEventExecutor; -import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutor; -import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.ScheduledFuture; -import io.netty.util.concurrent.UnorderedThreadPoolEventExecutor; import org.junit.After; import org.junit.AfterClass; import org.junit.Test; @@ -51,21 +48,17 @@ import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; import java.util.Queue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.LockSupport; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; @@ -672,37 +665,6 @@ public class DefaultChannelPipelineTest { assertSame(exception, error.get()); } - @Test - public void testChannelUnregistrationWithCustomExecutor() throws Exception { - final CountDownLatch channelLatch = new CountDownLatch(1); - final CountDownLatch handlerLatch = new CountDownLatch(1); - ChannelPipeline pipeline = newLocalChannel().pipeline(); - pipeline.addLast(new ChannelInitializer() { - @Override - protected void initChannel(Channel ch) throws Exception { - ch.pipeline().addLast(new WrapperExecutor(), - new ChannelInboundHandlerAdapter() { - - @Override - public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { - channelLatch.countDown(); - } - - @Override - public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - handlerLatch.countDown(); - } - }); - } - }); - Channel channel = pipeline.channel(); - channel.register().sync(); - channel.close(); - channel.deregister(); - assertTrue(channelLatch.await(2, TimeUnit.SECONDS)); - assertTrue(handlerLatch.await(2, TimeUnit.SECONDS)); - } - @Test(timeout = 3000) public void testAddHandlerBeforeRegisteredThenRemove() { final EventLoop loop = group.next(); @@ -740,129 +702,6 @@ public class DefaultChannelPipelineTest { pipeline.channel().close().syncUninterruptibly(); } - @Test(timeout = 3000) - public void testHandlerAddedAndRemovedCalledInCorrectOrder() throws Throwable { - final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1); - final EventExecutorGroup group2 = new DefaultEventExecutorGroup(1); - - try { - BlockingQueue addedQueue = new LinkedBlockingQueue<>(); - BlockingQueue removedQueue = new LinkedBlockingQueue<>(); - - CheckOrderHandler handler1 = new CheckOrderHandler(addedQueue, removedQueue); - CheckOrderHandler handler2 = new CheckOrderHandler(addedQueue, removedQueue); - CheckOrderHandler handler3 = new CheckOrderHandler(addedQueue, removedQueue); - CheckOrderHandler handler4 = new CheckOrderHandler(addedQueue, removedQueue); - - ChannelPipeline pipeline = newLocalChannel().pipeline(); - pipeline.addLast(handler1); - pipeline.channel().register().syncUninterruptibly(); - pipeline.addLast(group1.next(), handler2); - pipeline.addLast(group2.next(), handler3); - pipeline.addLast(handler4); - - assertTrue(removedQueue.isEmpty()); - pipeline.channel().close().syncUninterruptibly(); - assertHandler(addedQueue.take(), handler1); - - // Depending on timing this can be handler2 or handler3 as these use different EventExecutorGroups. - assertHandler(addedQueue.take(), handler2, handler3, handler4); - assertHandler(addedQueue.take(), handler2, handler3, handler4); - assertHandler(addedQueue.take(), handler2, handler3, handler4); - - assertTrue(addedQueue.isEmpty()); - - assertHandler(removedQueue.take(), handler4); - assertHandler(removedQueue.take(), handler3); - assertHandler(removedQueue.take(), handler2); - assertHandler(removedQueue.take(), handler1); - assertTrue(removedQueue.isEmpty()); - } finally { - group1.shutdownGracefully(); - group2.shutdownGracefully(); - } - } - - @Test(timeout = 3000) - public void testHandlerAddedExceptionFromChildHandlerIsPropagated() { - final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1); - try { - final Promise promise = group1.next().newPromise(); - final Exception exception = new RuntimeException(); - ChannelPipeline pipeline = newLocalChannel().pipeline(); - pipeline.addLast(group1.next(), new CheckExceptionHandler(exception, promise)); - pipeline.addFirst(new ChannelHandlerAdapter() { - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - throw exception; - } - }); - pipeline.channel().register(); - promise.syncUninterruptibly(); - pipeline.channel().close().syncUninterruptibly(); - } finally { - group1.shutdownGracefully(); - } - } - - @Test(timeout = 3000) - public void testHandlerRemovedExceptionFromChildHandlerIsPropagated() { - final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1); - try { - final Promise promise = group1.next().newPromise(); - String handlerName = "foo"; - final Exception exception = new RuntimeException(); - ChannelPipeline pipeline = newLocalChannel().pipeline(); - pipeline.addLast(handlerName, new ChannelHandlerAdapter() { - @Override - public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - throw exception; - } - }); - pipeline.addLast(group1.next(), new CheckExceptionHandler(exception, promise)); - pipeline.channel().register().syncUninterruptibly(); - pipeline.remove(handlerName); - promise.syncUninterruptibly(); - pipeline.channel().close().syncUninterruptibly(); - } finally { - group1.shutdownGracefully(); - } - } - - @Test(timeout = 3000) - public void testHandlerAddedThrowsAndRemovedThrowsException() throws InterruptedException { - final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1); - try { - final CountDownLatch latch = new CountDownLatch(1); - final Promise promise = group1.next().newPromise(); - final Exception exceptionAdded = new RuntimeException(); - final Exception exceptionRemoved = new RuntimeException(); - String handlerName = "foo"; - ChannelPipeline pipeline = newLocalChannel().pipeline(); - pipeline.addLast(group1.next(), new CheckExceptionHandler(exceptionAdded, promise)); - pipeline.addFirst(handlerName, new ChannelHandlerAdapter() { - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - throw exceptionAdded; - } - - @Override - public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - // Execute this later so we are sure the exception is handled first. - ctx.executor().execute(latch::countDown); - throw exceptionRemoved; - } - }); - pipeline.register().syncUninterruptibly(); - latch.await(); - assertNull(pipeline.context(handlerName)); - promise.syncUninterruptibly(); - pipeline.channel().close().syncUninterruptibly(); - } finally { - group1.shutdownGracefully(); - } - } - @Test(timeout = 2000) public void testAddRemoveHandlerCalled() throws Throwable { ChannelPipeline pipeline = newLocalChannel().pipeline(); @@ -1006,76 +845,6 @@ public class DefaultChannelPipelineTest { pipeline.addBefore("test", null, newHandler()); } - @Test(timeout = 3000) - public void testUnorderedEventExecutor() throws Throwable { - EventExecutorGroup eventExecutors = new UnorderedThreadPoolEventExecutor(2); - EventLoopGroup defaultGroup = new MultithreadEventLoopGroup(1, LocalHandler.newFactory()); - try { - EventLoop eventLoop1 = defaultGroup.next(); - ChannelPipeline pipeline1 = new LocalChannel(eventLoop1).pipeline(); - - pipeline1.channel().register().syncUninterruptibly(); - final CountDownLatch latch = new CountDownLatch(1); - pipeline1.addLast(eventExecutors.next(), new ChannelInboundHandlerAdapter() { - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - // Just block one of the two threads. - LockSupport.park(); - } - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - latch.countDown(); - } - }); - // Trigger an event, as we use UnorderedEventExecutor userEventTriggered should be called even when - // handlerAdded(...) blocks. - pipeline1.fireUserEventTriggered(""); - latch.await(); - } finally { - defaultGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).syncUninterruptibly(); - eventExecutors.shutdownGracefully(0, 0, TimeUnit.SECONDS).syncUninterruptibly(); - } - } - - @Test - public void testPinExecutor() { - EventExecutorGroup group = new DefaultEventExecutorGroup(2); - ChannelPipeline pipeline = newLocalChannel().pipeline(); - ChannelPipeline pipeline2 = newLocalChannel().pipeline(); - - EventExecutor executor = group.next(); - pipeline.addLast(executor, "h1", new ChannelInboundHandlerAdapter()); - pipeline.addLast(executor, "h2", new ChannelInboundHandlerAdapter()); - pipeline2.addLast(group.next(), "h3", new ChannelInboundHandlerAdapter()); - - EventExecutor executor1 = pipeline.context("h1").executor(); - EventExecutor executor2 = pipeline.context("h2").executor(); - assertNotNull(executor1); - assertNotNull(executor2); - assertSame(executor1, executor2); - EventExecutor executor3 = pipeline2.context("h3").executor(); - assertNotNull(executor3); - assertNotSame(executor3, executor2); - group.shutdownGracefully(0, 0, TimeUnit.SECONDS); - } - - @Test - public void testNotPinExecutor() { - EventExecutorGroup group = new DefaultEventExecutorGroup(2); - ChannelPipeline pipeline = newLocalChannel().pipeline(); - - pipeline.addLast(group.next(), "h1", new ChannelInboundHandlerAdapter()); - pipeline.addLast(group.next(), "h2", new ChannelInboundHandlerAdapter()); - - EventExecutor executor1 = pipeline.context("h1").executor(); - EventExecutor executor2 = pipeline.context("h2").executor(); - assertNotNull(executor1); - assertNotNull(executor2); - assertNotSame(executor1, executor2); - group.shutdownGracefully(0, 0, TimeUnit.SECONDS); - } - @Test(timeout = 3000) public void testVoidPromiseNotify() throws Throwable { EventLoopGroup defaultGroup = new MultithreadEventLoopGroup(1, LocalHandler.newFactory()); @@ -1832,13 +1601,21 @@ public class DefaultChannelPipelineTest { } private static void verifyContextNumber(ChannelPipeline pipeline, int expectedNumber) { - AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) pipeline.firstContext(); - int handlerNumber = 0; - while (ctx != ((DefaultChannelPipeline) pipeline).tail) { - handlerNumber++; - ctx = ctx.next; - } - assertEquals(expectedNumber, handlerNumber); + assertEquals(expectedNumber, pipeline.names().size()); + assertEquals(expectedNumber, pipeline.toMap().size()); + + pipeline.executor().submit(new Runnable() { + @Override + public void run() { + AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) pipeline.firstContext(); + int handlerNumber = 0; + while (ctx != ((DefaultChannelPipeline) pipeline).tail) { + handlerNumber++; + ctx = ctx.next; + } + assertEquals(expectedNumber, handlerNumber); + } + }).syncUninterruptibly(); } private static ChannelHandler[] newHandlers(int num) { diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java deleted file mode 100644 index ba5f8e8549..0000000000 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java +++ /dev/null @@ -1,597 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.local; - -import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelPromise; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.MultithreadEventLoopGroup; -import io.netty.util.ReferenceCountUtil; -import io.netty.util.concurrent.DefaultEventExecutorGroup; -import io.netty.util.concurrent.DefaultThreadFactory; -import io.netty.util.concurrent.EventExecutorGroup; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; - -import java.util.HashSet; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicReference; - -public class LocalTransportThreadModelTest { - - private static EventLoopGroup group; - private static LocalAddress localAddr; - - @BeforeClass - public static void init() { - // Configure a test server - group = new MultithreadEventLoopGroup(LocalHandler.newFactory()); - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { - // Discard - ReferenceCountUtil.release(msg); - } - }); - } - }); - - localAddr = (LocalAddress) sb.bind(LocalAddress.ANY).syncUninterruptibly().channel().localAddress(); - } - - @AfterClass - public static void destroy() throws Exception { - group.shutdownGracefully().sync(); - } - - @Test(timeout = 30000) - @Ignore("regression test") - public void testStagedExecutionMultiple() throws Throwable { - for (int i = 0; i < 10; i ++) { - testStagedExecution(); - } - } - - @Test(timeout = 5000) - public void testStagedExecution() throws Throwable { - EventLoopGroup l = new MultithreadEventLoopGroup(4, new DefaultThreadFactory("l"), - LocalHandler.newFactory()); - EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1")); - EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2")); - ThreadNameAuditor h1 = new ThreadNameAuditor(); - ThreadNameAuditor h2 = new ThreadNameAuditor(); - ThreadNameAuditor h3 = new ThreadNameAuditor(true); - - Channel ch = new LocalChannel(l.next()); - // With no EventExecutor specified, h1 will be always invoked by EventLoop 'l'. - ch.pipeline().addLast(h1); - // h2 will be always invoked by EventExecutor 'e1'. - ch.pipeline().addLast(e1.next(), h2); - // h3 will be always invoked by EventExecutor 'e2'. - ch.pipeline().addLast(e2.next(), h3); - - ch.register().sync().channel().connect(localAddr).sync(); - - // Fire inbound events from all possible starting points. - ch.pipeline().fireChannelRead("1"); - ch.pipeline().context(h1).fireChannelRead("2"); - ch.pipeline().context(h2).fireChannelRead("3"); - ch.pipeline().context(h3).fireChannelRead("4"); - // Fire outbound events from all possible starting points. - ch.pipeline().write("5"); - ch.pipeline().context(h3).write("6"); - ch.pipeline().context(h2).write("7"); - ch.pipeline().context(h1).writeAndFlush("8").sync(); - - ch.close().sync(); - - // Wait until all events are handled completely. - while (h1.outboundThreadNames.size() < 3 || h3.inboundThreadNames.size() < 3 || - h1.removalThreadNames.size() < 1) { - if (h1.exception.get() != null) { - throw h1.exception.get(); - } - if (h2.exception.get() != null) { - throw h2.exception.get(); - } - if (h3.exception.get() != null) { - throw h3.exception.get(); - } - - Thread.sleep(10); - } - - String currentName = Thread.currentThread().getName(); - - try { - // Events should never be handled from the current thread. - Assert.assertFalse(h1.inboundThreadNames.contains(currentName)); - Assert.assertFalse(h2.inboundThreadNames.contains(currentName)); - Assert.assertFalse(h3.inboundThreadNames.contains(currentName)); - Assert.assertFalse(h1.outboundThreadNames.contains(currentName)); - Assert.assertFalse(h2.outboundThreadNames.contains(currentName)); - Assert.assertFalse(h3.outboundThreadNames.contains(currentName)); - Assert.assertFalse(h1.removalThreadNames.contains(currentName)); - Assert.assertFalse(h2.removalThreadNames.contains(currentName)); - Assert.assertFalse(h3.removalThreadNames.contains(currentName)); - - // Assert that events were handled by the correct executor. - for (String name: h1.inboundThreadNames) { - Assert.assertTrue(name.startsWith("l-")); - } - for (String name: h2.inboundThreadNames) { - Assert.assertTrue(name.startsWith("e1-")); - } - for (String name: h3.inboundThreadNames) { - Assert.assertTrue(name.startsWith("e2-")); - } - for (String name: h1.outboundThreadNames) { - Assert.assertTrue(name.startsWith("l-")); - } - for (String name: h2.outboundThreadNames) { - Assert.assertTrue(name.startsWith("e1-")); - } - for (String name: h3.outboundThreadNames) { - Assert.assertTrue(name.startsWith("e2-")); - } - for (String name: h1.removalThreadNames) { - Assert.assertTrue(name.startsWith("l-")); - } - for (String name: h2.removalThreadNames) { - Assert.assertTrue(name.startsWith("e1-")); - } - for (String name: h3.removalThreadNames) { - Assert.assertTrue(name.startsWith("e2-")); - } - - // Assert that the events for the same handler were handled by the same thread. - Set names = new HashSet<>(); - names.addAll(h1.inboundThreadNames); - names.addAll(h1.outboundThreadNames); - names.addAll(h1.removalThreadNames); - Assert.assertEquals(1, names.size()); - - names.clear(); - names.addAll(h2.inboundThreadNames); - names.addAll(h2.outboundThreadNames); - names.addAll(h2.removalThreadNames); - Assert.assertEquals(1, names.size()); - - names.clear(); - names.addAll(h3.inboundThreadNames); - names.addAll(h3.outboundThreadNames); - names.addAll(h3.removalThreadNames); - Assert.assertEquals(1, names.size()); - - // Count the number of events - Assert.assertEquals(1, h1.inboundThreadNames.size()); - Assert.assertEquals(2, h2.inboundThreadNames.size()); - Assert.assertEquals(3, h3.inboundThreadNames.size()); - Assert.assertEquals(3, h1.outboundThreadNames.size()); - Assert.assertEquals(2, h2.outboundThreadNames.size()); - Assert.assertEquals(1, h3.outboundThreadNames.size()); - Assert.assertEquals(1, h1.removalThreadNames.size()); - Assert.assertEquals(1, h2.removalThreadNames.size()); - Assert.assertEquals(1, h3.removalThreadNames.size()); - } catch (AssertionError e) { - System.out.println("H1I: " + h1.inboundThreadNames); - System.out.println("H2I: " + h2.inboundThreadNames); - System.out.println("H3I: " + h3.inboundThreadNames); - System.out.println("H1O: " + h1.outboundThreadNames); - System.out.println("H2O: " + h2.outboundThreadNames); - System.out.println("H3O: " + h3.outboundThreadNames); - System.out.println("H1R: " + h1.removalThreadNames); - System.out.println("H2R: " + h2.removalThreadNames); - System.out.println("H3R: " + h3.removalThreadNames); - throw e; - } finally { - l.shutdownGracefully(); - e1.shutdownGracefully(); - e2.shutdownGracefully(); - - l.terminationFuture().sync(); - e1.terminationFuture().sync(); - e2.terminationFuture().sync(); - } - } - - @Test(timeout = 30000) - @Ignore - public void testConcurrentMessageBufferAccess() throws Throwable { - EventLoopGroup l = new MultithreadEventLoopGroup(4, new DefaultThreadFactory("l"), - LocalHandler.newFactory()); - EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1")); - EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2")); - EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e3")); - EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e4")); - EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e5")); - - try { - final MessageForwarder1 h1 = new MessageForwarder1(); - final MessageForwarder2 h2 = new MessageForwarder2(); - final MessageForwarder3 h3 = new MessageForwarder3(); - final MessageForwarder1 h4 = new MessageForwarder1(); - final MessageForwarder2 h5 = new MessageForwarder2(); - final MessageDiscarder h6 = new MessageDiscarder(); - - final Channel ch = new LocalChannel(l.next()); - - // inbound: int -> byte[4] -> int -> int -> byte[4] -> int -> /dev/null - // outbound: int -> int -> byte[4] -> int -> int -> byte[4] -> /dev/null - ch.pipeline().addLast(h1) - .addLast(e1.next(), h2) - .addLast(e2.next(), h3) - .addLast(e3.next(), h4) - .addLast(e4.next(), h5) - .addLast(e5.next(), h6); - - ch.register().sync().channel().connect(localAddr).sync(); - - final int ROUNDS = 1024; - final int ELEMS_PER_ROUNDS = 8192; - final int TOTAL_CNT = ROUNDS * ELEMS_PER_ROUNDS; - for (int i = 0; i < TOTAL_CNT;) { - final int start = i; - final int end = i + ELEMS_PER_ROUNDS; - i = end; - - ch.eventLoop().execute(() -> { - for (int j = start; j < end; j ++) { - ch.pipeline().fireChannelRead(Integer.valueOf(j)); - } - }); - } - - while (h1.inCnt < TOTAL_CNT || h2.inCnt < TOTAL_CNT || h3.inCnt < TOTAL_CNT || - h4.inCnt < TOTAL_CNT || h5.inCnt < TOTAL_CNT || h6.inCnt < TOTAL_CNT) { - if (h1.exception.get() != null) { - throw h1.exception.get(); - } - if (h2.exception.get() != null) { - throw h2.exception.get(); - } - if (h3.exception.get() != null) { - throw h3.exception.get(); - } - if (h4.exception.get() != null) { - throw h4.exception.get(); - } - if (h5.exception.get() != null) { - throw h5.exception.get(); - } - if (h6.exception.get() != null) { - throw h6.exception.get(); - } - Thread.sleep(10); - } - - for (int i = 0; i < TOTAL_CNT;) { - final int start = i; - final int end = i + ELEMS_PER_ROUNDS; - i = end; - - ch.pipeline().context(h6).executor().execute(() -> { - for (int j = start; j < end; j ++) { - ch.write(Integer.valueOf(j)); - } - ch.flush(); - }); - } - - while (h1.outCnt < TOTAL_CNT || h2.outCnt < TOTAL_CNT || h3.outCnt < TOTAL_CNT || - h4.outCnt < TOTAL_CNT || h5.outCnt < TOTAL_CNT || h6.outCnt < TOTAL_CNT) { - if (h1.exception.get() != null) { - throw h1.exception.get(); - } - if (h2.exception.get() != null) { - throw h2.exception.get(); - } - if (h3.exception.get() != null) { - throw h3.exception.get(); - } - if (h4.exception.get() != null) { - throw h4.exception.get(); - } - if (h5.exception.get() != null) { - throw h5.exception.get(); - } - if (h6.exception.get() != null) { - throw h6.exception.get(); - } - Thread.sleep(10); - } - - ch.close().sync(); - } finally { - l.shutdownGracefully(); - e1.shutdownGracefully(); - e2.shutdownGracefully(); - e3.shutdownGracefully(); - e4.shutdownGracefully(); - e5.shutdownGracefully(); - - l.terminationFuture().sync(); - e1.terminationFuture().sync(); - e2.terminationFuture().sync(); - e3.terminationFuture().sync(); - e4.terminationFuture().sync(); - e5.terminationFuture().sync(); - } - } - - private static class ThreadNameAuditor extends ChannelDuplexHandler { - - private final AtomicReference exception = new AtomicReference<>(); - - private final Queue inboundThreadNames = new ConcurrentLinkedQueue<>(); - private final Queue outboundThreadNames = new ConcurrentLinkedQueue<>(); - private final Queue removalThreadNames = new ConcurrentLinkedQueue<>(); - private final boolean discard; - - ThreadNameAuditor() { - this(false); - } - - ThreadNameAuditor(boolean discard) { - this.discard = discard; - } - - @Override - public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - removalThreadNames.add(Thread.currentThread().getName()); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - inboundThreadNames.add(Thread.currentThread().getName()); - if (!discard) { - ctx.fireChannelRead(msg); - } - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - outboundThreadNames.add(Thread.currentThread().getName()); - ctx.write(msg, promise); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - exception.compareAndSet(null, cause); - System.err.print('[' + Thread.currentThread().getName() + "] "); - cause.printStackTrace(); - super.exceptionCaught(ctx, cause); - } - } - - /** - * Converts integers into a binary stream. - */ - private static class MessageForwarder1 extends ChannelDuplexHandler { - - private final AtomicReference exception = new AtomicReference<>(); - private volatile int inCnt; - private volatile int outCnt; - private volatile Thread t; - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - Thread t = this.t; - if (t == null) { - this.t = Thread.currentThread(); - } else { - Assert.assertSame(t, Thread.currentThread()); - } - - ByteBuf out = ctx.alloc().buffer(4); - int m = ((Integer) msg).intValue(); - int expected = inCnt ++; - Assert.assertEquals(expected, m); - out.writeInt(m); - - ctx.fireChannelRead(out); - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - Assert.assertSame(t, Thread.currentThread()); - - // Don't let the write request go to the server-side channel - just swallow. - boolean swallow = this == ctx.pipeline().first(); - - ByteBuf m = (ByteBuf) msg; - int count = m.readableBytes() / 4; - for (int j = 0; j < count; j ++) { - int actual = m.readInt(); - int expected = outCnt ++; - Assert.assertEquals(expected, actual); - if (!swallow) { - ctx.write(actual); - } - } - ctx.writeAndFlush(Unpooled.EMPTY_BUFFER, promise); - m.release(); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - exception.compareAndSet(null, cause); - //System.err.print("[" + Thread.currentThread().getName() + "] "); - //cause.printStackTrace(); - super.exceptionCaught(ctx, cause); - } - } - - /** - * Converts a binary stream into integers. - */ - private static class MessageForwarder2 extends ChannelDuplexHandler { - - private final AtomicReference exception = new AtomicReference<>(); - private volatile int inCnt; - private volatile int outCnt; - private volatile Thread t; - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - Thread t = this.t; - if (t == null) { - this.t = Thread.currentThread(); - } else { - Assert.assertSame(t, Thread.currentThread()); - } - - ByteBuf m = (ByteBuf) msg; - int count = m.readableBytes() / 4; - for (int j = 0; j < count; j ++) { - int actual = m.readInt(); - int expected = inCnt ++; - Assert.assertEquals(expected, actual); - ctx.fireChannelRead(actual); - } - m.release(); - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - Assert.assertSame(t, Thread.currentThread()); - - ByteBuf out = ctx.alloc().buffer(4); - int m = (Integer) msg; - int expected = outCnt ++; - Assert.assertEquals(expected, m); - out.writeInt(m); - - ctx.write(out, promise); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - exception.compareAndSet(null, cause); - //System.err.print("[" + Thread.currentThread().getName() + "] "); - //cause.printStackTrace(); - super.exceptionCaught(ctx, cause); - } - } - - /** - * Simply forwards the received object to the next handler. - */ - private static class MessageForwarder3 extends ChannelDuplexHandler { - - private final AtomicReference exception = new AtomicReference<>(); - private volatile int inCnt; - private volatile int outCnt; - private volatile Thread t; - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - Thread t = this.t; - if (t == null) { - this.t = Thread.currentThread(); - } else { - Assert.assertSame(t, Thread.currentThread()); - } - - int actual = (Integer) msg; - int expected = inCnt ++; - Assert.assertEquals(expected, actual); - - ctx.fireChannelRead(msg); - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - Assert.assertSame(t, Thread.currentThread()); - - int actual = (Integer) msg; - int expected = outCnt ++; - Assert.assertEquals(expected, actual); - - ctx.write(msg, promise); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - exception.compareAndSet(null, cause); - System.err.print('[' + Thread.currentThread().getName() + "] "); - cause.printStackTrace(); - super.exceptionCaught(ctx, cause); - } - } - - /** - * Discards all received messages. - */ - private static class MessageDiscarder extends ChannelDuplexHandler { - - private final AtomicReference exception = new AtomicReference<>(); - private volatile int inCnt; - private volatile int outCnt; - private volatile Thread t; - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - Thread t = this.t; - if (t == null) { - this.t = Thread.currentThread(); - } else { - Assert.assertSame(t, Thread.currentThread()); - } - - int actual = (Integer) msg; - int expected = inCnt ++; - Assert.assertEquals(expected, actual); - } - - @Override - public void write( - ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - Assert.assertSame(t, Thread.currentThread()); - - int actual = (Integer) msg; - int expected = outCnt ++; - Assert.assertEquals(expected, actual); - ctx.write(msg, promise); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - exception.compareAndSet(null, cause); - //System.err.print("[" + Thread.currentThread().getName() + "] "); - //cause.printStackTrace(); - super.exceptionCaught(ctx, cause); - } - } -} diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java deleted file mode 100644 index 0be7647195..0000000000 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java +++ /dev/null @@ -1,326 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.local; - -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelPromise; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.MultithreadEventLoopGroup; -import io.netty.util.ReferenceCountUtil; -import io.netty.util.concurrent.DefaultEventExecutorGroup; -import io.netty.util.concurrent.DefaultThreadFactory; -import io.netty.util.concurrent.EventExecutorGroup; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; - -import java.util.Deque; -import java.util.LinkedList; -import java.util.Queue; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedDeque; - -public class LocalTransportThreadModelTest3 { - - enum EventType { - EXCEPTION_CAUGHT, - USER_EVENT, - MESSAGE_RECEIVED_LAST, - INACTIVE, - ACTIVE, - UNREGISTERED, - REGISTERED, - MESSAGE_RECEIVED, - WRITE, - READ - } - - private static EventLoopGroup group; - private static LocalAddress localAddr; - - @BeforeClass - public static void init() { - // Configure a test server - group = new MultithreadEventLoopGroup(LocalHandler.newFactory()); - ServerBootstrap sb = new ServerBootstrap(); - sb.group(group) - .channel(LocalServerChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(LocalChannel ch) throws Exception { - ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { - // Discard - ReferenceCountUtil.release(msg); - } - }); - } - }); - - localAddr = (LocalAddress) sb.bind(LocalAddress.ANY).syncUninterruptibly().channel().localAddress(); - } - - @AfterClass - public static void destroy() throws Exception { - group.shutdownGracefully().sync(); - } - - @Test(timeout = 60000) - @Ignore("regression test") - public void testConcurrentAddRemoveInboundEventsMultiple() throws Throwable { - for (int i = 0; i < 50; i ++) { - testConcurrentAddRemoveInboundEvents(); - } - } - - @Test(timeout = 60000) - @Ignore("regression test") - public void testConcurrentAddRemoveOutboundEventsMultiple() throws Throwable { - for (int i = 0; i < 50; i ++) { - testConcurrentAddRemoveOutboundEvents(); - } - } - - @Test(timeout = 30000) - @Ignore("needs a fix") - public void testConcurrentAddRemoveInboundEvents() throws Throwable { - testConcurrentAddRemove(true); - } - - @Test(timeout = 30000) - @Ignore("needs a fix") - public void testConcurrentAddRemoveOutboundEvents() throws Throwable { - testConcurrentAddRemove(false); - } - - private static void testConcurrentAddRemove(boolean inbound) throws Exception { - EventLoopGroup l = new MultithreadEventLoopGroup(4, new DefaultThreadFactory("l"), - LocalHandler.newFactory()); - EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1")); - EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2")); - EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e3")); - EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e4")); - EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e5")); - - final EventExecutorGroup[] groups = {e1, e2, e3, e4, e5}; - try { - Deque events = new ConcurrentLinkedDeque<>(); - final EventForwarder h1 = new EventForwarder(); - final EventForwarder h2 = new EventForwarder(); - final EventForwarder h3 = new EventForwarder(); - final EventForwarder h4 = new EventForwarder(); - final EventForwarder h5 = new EventForwarder(); - final EventRecorder h6 = new EventRecorder(events, inbound); - - final Channel ch = new LocalChannel(l.next()); - if (!inbound) { - ch.config().setAutoRead(false); - } - ch.pipeline().addLast(e1.next(), h1) - .addLast(e1.next(), h2) - .addLast(e1.next(), h3) - .addLast(e1.next(), h4) - .addLast(e1.next(), h5) - .addLast(e1.next(), "recorder", h6); - - ch.register().sync().channel().connect(localAddr).sync(); - - final LinkedList expectedEvents = events(inbound, 8192); - - Throwable cause = new Throwable(); - - Thread pipelineModifier = new Thread(() -> { - Random random = new Random(); - - while (true) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - return; - } - if (!ch.isRegistered()) { - continue; - } - //EventForwardHandler forwardHandler = forwarders[random.nextInt(forwarders.length)]; - ChannelHandler handler = ch.pipeline().removeFirst(); - ch.pipeline().addBefore(groups[random.nextInt(groups.length)].next(), "recorder", - UUID.randomUUID().toString(), handler); - } - }); - pipelineModifier.setDaemon(true); - pipelineModifier.start(); - for (EventType event: expectedEvents) { - switch (event) { - case EXCEPTION_CAUGHT: - ch.pipeline().fireExceptionCaught(cause); - break; - case MESSAGE_RECEIVED: - ch.pipeline().fireChannelRead(""); - break; - case MESSAGE_RECEIVED_LAST: - ch.pipeline().fireChannelReadComplete(); - break; - case USER_EVENT: - ch.pipeline().fireUserEventTriggered(""); - break; - case WRITE: - ch.pipeline().write(""); - break; - case READ: - ch.pipeline().read(); - break; - } - } - - ch.close().sync(); - - while (events.peekLast() != EventType.UNREGISTERED) { - Thread.sleep(10); - } - - expectedEvents.addFirst(EventType.ACTIVE); - expectedEvents.addFirst(EventType.REGISTERED); - expectedEvents.addLast(EventType.INACTIVE); - expectedEvents.addLast(EventType.UNREGISTERED); - - for (;;) { - EventType event = events.poll(); - if (event == null) { - Assert.assertTrue("Missing events:" + expectedEvents, expectedEvents.isEmpty()); - break; - } - Assert.assertEquals(event, expectedEvents.poll()); - } - } finally { - l.shutdownGracefully(); - e1.shutdownGracefully(); - e2.shutdownGracefully(); - e3.shutdownGracefully(); - e4.shutdownGracefully(); - e5.shutdownGracefully(); - - l.terminationFuture().sync(); - e1.terminationFuture().sync(); - e2.terminationFuture().sync(); - e3.terminationFuture().sync(); - e4.terminationFuture().sync(); - e5.terminationFuture().sync(); - } - } - - private static LinkedList events(boolean inbound, int size) { - EventType[] events; - if (inbound) { - events = new EventType[] { - EventType.USER_EVENT, EventType.MESSAGE_RECEIVED, EventType.MESSAGE_RECEIVED_LAST, - EventType.EXCEPTION_CAUGHT}; - } else { - events = new EventType[] { - EventType.READ, EventType.WRITE, EventType.EXCEPTION_CAUGHT }; - } - - Random random = new Random(); - LinkedList expectedEvents = new LinkedList<>(); - for (int i = 0; i < size; i++) { - expectedEvents.add(events[random.nextInt(events.length)]); - } - return expectedEvents; - } - - @ChannelHandler.Sharable - private static final class EventForwarder extends ChannelDuplexHandler { } - - private static final class EventRecorder extends ChannelDuplexHandler { - private final Queue events; - private final boolean inbound; - - EventRecorder(Queue events, boolean inbound) { - this.events = events; - this.inbound = inbound; - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - events.add(EventType.EXCEPTION_CAUGHT); - } - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (inbound) { - events.add(EventType.USER_EVENT); - } - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - if (inbound) { - events.add(EventType.MESSAGE_RECEIVED_LAST); - } - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - events.add(EventType.INACTIVE); - } - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - events.add(EventType.ACTIVE); - } - - @Override - public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { - events.add(EventType.UNREGISTERED); - } - - @Override - public void channelRegistered(ChannelHandlerContext ctx) throws Exception { - events.add(EventType.REGISTERED); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - if (inbound) { - events.add(EventType.MESSAGE_RECEIVED); - } - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - if (!inbound) { - events.add(EventType.WRITE); - } - promise.setSuccess(); - } - - @Override - public void read(ChannelHandlerContext ctx) { - if (!inbound) { - events.add(EventType.READ); - } - } - } -}