From 5cfb107822442cc5f3da9d5395e12dace09dca71 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 23 Jan 2019 07:44:32 +0100 Subject: [PATCH] Leave responsibility to choose EventExecutor to the user (#8746) Motivation: We should leave the responsibility to choose the EventExecutor for a ChannelHandler to the user for more flexibility and to keep things simple. Modification: - Change method signatures to take an EventExecutor and not an EventExecutorGroup - Remove special ChannelOption that allowed to enable / disable EventExecutor pinning Result: Simpler and more flexible code. --- .../transport/socket/SocketEchoTest.java | 4 +- .../transport/socket/SocketStartTlsTest.java | 4 +- .../java/io/netty/channel/ChannelOption.java | 3 - .../io/netty/channel/ChannelPipeline.java | 27 +++---- .../netty/channel/DefaultChannelConfig.java | 19 +---- .../netty/channel/DefaultChannelPipeline.java | 73 ++++++------------- .../netty/channel/AbstractEventLoopTest.java | 2 +- .../channel/DefaultChannelPipelineTest.java | 24 +++--- .../local/LocalTransportThreadModelTest.java | 14 ++-- .../local/LocalTransportThreadModelTest3.java | 14 ++-- 10 files changed, 70 insertions(+), 114 deletions(-) diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketEchoTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketEchoTest.java index e2a63e00d7..4c4ee73faf 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketEchoTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketEchoTest.java @@ -132,13 +132,13 @@ public class SocketEchoTest extends AbstractSocketTest { sb.childHandler(new ChannelInitializer() { @Override protected void initChannel(Channel c) throws Exception { - c.pipeline().addLast(group, sh); + c.pipeline().addLast(group.next(), sh); } }); cb.handler(new ChannelInitializer() { @Override protected void initChannel(Channel c) throws Exception { - c.pipeline().addLast(group, ch); + c.pipeline().addLast(group.next(), ch); } }); } else { diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java index 6a2b6b6e9b..973e92be1d 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java @@ -159,7 +159,7 @@ public class SocketStartTlsTest extends AbstractSocketTest { ChannelPipeline p = sch.pipeline(); p.addLast("logger", new LoggingHandler(LOG_LEVEL)); p.addLast(new LineBasedFrameDecoder(64), new StringDecoder(), new StringEncoder()); - p.addLast(executor, sh); + p.addLast(executor.next(), sh); } }); @@ -169,7 +169,7 @@ public class SocketStartTlsTest extends AbstractSocketTest { ChannelPipeline p = sch.pipeline(); p.addLast("logger", new LoggingHandler(LOG_LEVEL)); p.addLast(new LineBasedFrameDecoder(64), new StringDecoder(), new StringEncoder()); - p.addLast(executor, ch); + p.addLast(executor.next(), ch); } }); diff --git a/transport/src/main/java/io/netty/channel/ChannelOption.java b/transport/src/main/java/io/netty/channel/ChannelOption.java index 21b52e798b..481cc69b0f 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOption.java +++ b/transport/src/main/java/io/netty/channel/ChannelOption.java @@ -126,9 +126,6 @@ public class ChannelOption extends AbstractConstant> { public static final ChannelOption DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION = valueOf("DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION"); - public static final ChannelOption SINGLE_EVENTEXECUTOR_PER_GROUP = - valueOf("SINGLE_EVENTEXECUTOR_PER_GROUP"); - /** * Creates a new {@link ChannelOption} with the specified unique {@code name}. */ diff --git a/transport/src/main/java/io/netty/channel/ChannelPipeline.java b/transport/src/main/java/io/netty/channel/ChannelPipeline.java index c415d8536d..8a8571af15 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/ChannelPipeline.java @@ -17,6 +17,7 @@ package io.netty.channel; import io.netty.buffer.ByteBuf; import io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorGroup; import java.net.SocketAddress; @@ -204,7 +205,7 @@ import java.util.NoSuchElementException; * // a time-consuming task. * // If your business logic is fully asynchronous or finished very quickly, you don't * // need to specify a group. - * pipeline.addLast(group, "handler", new MyBusinessLogicHandler()); + * pipeline.addLast(group.next(), "handler", new MyBusinessLogicHandler()); * * *

Thread safety

@@ -232,7 +233,7 @@ public interface ChannelPipeline /** * Inserts a {@link ChannelHandler} at the first position of this pipeline. * - * @param group the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler} + * @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler} * methods * @param name the name of the handler to insert first * @param handler the handler to insert first @@ -242,7 +243,7 @@ public interface ChannelPipeline * @throws NullPointerException * if the specified handler is {@code null} */ - ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler); + ChannelPipeline addFirst(EventExecutor executor, String name, ChannelHandler handler); /** * Appends a {@link ChannelHandler} at the last position of this pipeline. @@ -260,7 +261,7 @@ public interface ChannelPipeline /** * Appends a {@link ChannelHandler} at the last position of this pipeline. * - * @param group the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler} + * @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler} * methods * @param name the name of the handler to append * @param handler the handler to append @@ -270,7 +271,7 @@ public interface ChannelPipeline * @throws NullPointerException * if the specified handler is {@code null} */ - ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler); + ChannelPipeline addLast(EventExecutor executor, String name, ChannelHandler handler); /** * Inserts a {@link ChannelHandler} before an existing handler of this @@ -293,7 +294,7 @@ public interface ChannelPipeline * Inserts a {@link ChannelHandler} before an existing handler of this * pipeline. * - * @param group the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler} + * @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler} * methods * @param baseName the name of the existing handler * @param name the name of the handler to insert before @@ -306,7 +307,7 @@ public interface ChannelPipeline * @throws NullPointerException * if the specified baseName or handler is {@code null} */ - ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler); + ChannelPipeline addBefore(EventExecutor executor, String baseName, String name, ChannelHandler handler); /** * Inserts a {@link ChannelHandler} after an existing handler of this @@ -329,7 +330,7 @@ public interface ChannelPipeline * Inserts a {@link ChannelHandler} after an existing handler of this * pipeline. * - * @param group the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler} + * @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler} * methods * @param baseName the name of the existing handler * @param name the name of the handler to insert after @@ -342,7 +343,7 @@ public interface ChannelPipeline * @throws NullPointerException * if the specified baseName or handler is {@code null} */ - ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler); + ChannelPipeline addAfter(EventExecutor executor, String baseName, String name, ChannelHandler handler); /** * Inserts {@link ChannelHandler}s at the first position of this pipeline. @@ -355,12 +356,12 @@ public interface ChannelPipeline /** * Inserts {@link ChannelHandler}s at the first position of this pipeline. * - * @param group the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler}s + * @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler}s * methods. * @param handlers the handlers to insert first * */ - ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers); + ChannelPipeline addFirst(EventExecutor executor, ChannelHandler... handlers); /** * Inserts {@link ChannelHandler}s at the last position of this pipeline. @@ -373,12 +374,12 @@ public interface ChannelPipeline /** * Inserts {@link ChannelHandler}s at the last position of this pipeline. * - * @param group the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler}s + * @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler}s * methods. * @param handlers the handlers to insert last * */ - ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers); + ChannelPipeline addLast(EventExecutor executor, ChannelHandler... handlers); /** * Removes the specified {@link ChannelHandler} from this pipeline. diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java b/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java index c3b8dd1927..dcb8adedf4 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java @@ -30,7 +30,6 @@ import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; import static io.netty.channel.ChannelOption.MAX_MESSAGES_PER_READ; import static io.netty.channel.ChannelOption.MESSAGE_SIZE_ESTIMATOR; import static io.netty.channel.ChannelOption.RCVBUF_ALLOCATOR; -import static io.netty.channel.ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP; import static io.netty.channel.ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK; import static io.netty.channel.ChannelOption.WRITE_BUFFER_LOW_WATER_MARK; import static io.netty.channel.ChannelOption.WRITE_BUFFER_WATER_MARK; @@ -81,8 +80,7 @@ public class DefaultChannelConfig implements ChannelConfig { null, CONNECT_TIMEOUT_MILLIS, MAX_MESSAGES_PER_READ, WRITE_SPIN_COUNT, ALLOCATOR, AUTO_READ, AUTO_CLOSE, RCVBUF_ALLOCATOR, WRITE_BUFFER_HIGH_WATER_MARK, - WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_WATER_MARK, MESSAGE_SIZE_ESTIMATOR, - SINGLE_EVENTEXECUTOR_PER_GROUP); + WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_WATER_MARK, MESSAGE_SIZE_ESTIMATOR); } protected Map, Object> getOptions( @@ -153,9 +151,6 @@ public class DefaultChannelConfig implements ChannelConfig { if (option == MESSAGE_SIZE_ESTIMATOR) { return (T) getMessageSizeEstimator(); } - if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) { - return (T) Boolean.valueOf(getPinEventExecutorPerGroup()); - } return null; } @@ -186,8 +181,6 @@ public class DefaultChannelConfig implements ChannelConfig { setWriteBufferWaterMark((WriteBufferWaterMark) value); } else if (option == MESSAGE_SIZE_ESTIMATOR) { setMessageSizeEstimator((MessageSizeEstimator) value); - } else if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) { - setPinEventExecutorPerGroup((Boolean) value); } else { return false; } @@ -426,14 +419,4 @@ public class DefaultChannelConfig implements ChannelConfig { msgSizeEstimator = estimator; return this; } - - private ChannelConfig setPinEventExecutorPerGroup(boolean pinEventExecutor) { - this.pinEventExecutor = pinEventExecutor; - return this; - } - - private boolean getPinEventExecutorPerGroup() { - return pinEventExecutor; - } - } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index c968bc9718..e7f47d0f91 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -19,7 +19,6 @@ import io.netty.channel.Channel.Unsafe; import io.netty.util.ReferenceCountUtil; import io.netty.util.ResourceLeakDetector; import io.netty.util.concurrent.EventExecutor; -import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.StringUtil; @@ -29,7 +28,6 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import java.net.SocketAddress; import java.util.ArrayList; -import java.util.IdentityHashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -68,7 +66,6 @@ public class DefaultChannelPipeline implements ChannelPipeline { private final VoidChannelPromise voidPromise; private final boolean touch = ResourceLeakDetector.isEnabled(); - private Map childExecutors; private volatile MessageSizeEstimator.Handle estimatorHandle; protected DefaultChannelPipeline(Channel channel) { @@ -98,32 +95,10 @@ public class DefaultChannelPipeline implements ChannelPipeline { return touch ? ReferenceCountUtil.touch(msg, next) : msg; } - private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) { - return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler); + private AbstractChannelHandlerContext newContext(EventExecutor executor, String name, ChannelHandler handler) { + return new DefaultChannelHandlerContext(this, executor, name, handler); } - private EventExecutor childExecutor(EventExecutorGroup group) { - if (group == null) { - return null; - } - Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP); - if (pinEventExecutor != null && !pinEventExecutor) { - return group.next(); - } - Map childExecutors = this.childExecutors; - if (childExecutors == null) { - // Use size of 4 as most people only use one extra EventExecutor. - childExecutors = this.childExecutors = new IdentityHashMap<>(4); - } - // Pin one of the child executors once and remember it so that the same child executor - // is used to fire events for the same channel. - EventExecutor childExecutor = childExecutors.get(group); - if (childExecutor == null) { - childExecutor = group.next(); - childExecutors.put(group, childExecutor); - } - return childExecutor; - } @Override public final Channel channel() { return channel; @@ -135,20 +110,20 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) { + public final ChannelPipeline addFirst(EventExecutor executor, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); name = filterName(name, handler); - newCtx = newContext(group, name, handler); + newCtx = newContext(executor, name, handler); addFirst0(newCtx); - EventExecutor executor = newCtx.executor(); - if (!executor.inEventLoop()) { + EventExecutor ctxExecutor = newCtx.executor(); + if (!ctxExecutor.inEventLoop()) { newCtx.setAddPending(); - executor.execute(new Runnable() { + ctxExecutor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); @@ -175,19 +150,19 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { + public final ChannelPipeline addLast(EventExecutor executor, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); - newCtx = newContext(group, filterName(name, handler), handler); + newCtx = newContext(executor, filterName(name, handler), handler); addLast0(newCtx); - EventExecutor executor = newCtx.executor(); - if (!executor.inEventLoop()) { + EventExecutor ctxExecutor = newCtx.executor(); + if (!ctxExecutor.inEventLoop()) { newCtx.setAddPending(); - executor.execute(new Runnable() { + ctxExecutor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); @@ -215,7 +190,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public final ChannelPipeline addBefore( - EventExecutorGroup group, String baseName, String name, ChannelHandler handler) { + EventExecutor executor, String baseName, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; final AbstractChannelHandlerContext ctx; synchronized (this) { @@ -223,14 +198,14 @@ public class DefaultChannelPipeline implements ChannelPipeline { name = filterName(name, handler); ctx = getContextOrDie(baseName); - newCtx = newContext(group, name, handler); + newCtx = newContext(executor, name, handler); addBefore0(ctx, newCtx); - EventExecutor executor = newCtx.executor(); - if (!executor.inEventLoop()) { + EventExecutor ctxExecutor = newCtx.executor(); + if (!ctxExecutor.inEventLoop()) { newCtx.setAddPending(); - executor.execute(new Runnable() { + ctxExecutor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); @@ -265,7 +240,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public final ChannelPipeline addAfter( - EventExecutorGroup group, String baseName, String name, ChannelHandler handler) { + EventExecutor executor, String baseName, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; final AbstractChannelHandlerContext ctx; @@ -274,14 +249,14 @@ public class DefaultChannelPipeline implements ChannelPipeline { name = filterName(name, handler); ctx = getContextOrDie(baseName); - newCtx = newContext(group, name, handler); + newCtx = newContext(executor, name, handler); addAfter0(ctx, newCtx); - EventExecutor executor = newCtx.executor(); - if (!executor.inEventLoop()) { + EventExecutor ctxExecutor = newCtx.executor(); + if (!ctxExecutor.inEventLoop()) { newCtx.setAddPending(); - executor.execute(new Runnable() { + ctxExecutor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); @@ -311,7 +286,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) { + public final ChannelPipeline addFirst(EventExecutor executor, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } @@ -344,7 +319,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { + public final ChannelPipeline addLast(EventExecutor executor, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } diff --git a/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java b/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java index 76e75e3fba..69cc007bec 100644 --- a/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java @@ -49,7 +49,7 @@ public abstract class AbstractEventLoopTest { @Override public void initChannel(ServerSocketChannel ch) throws Exception { ch.pipeline().addLast(new TestChannelHandler()); - ch.pipeline().addLast(eventExecutorGroup, new TestChannelHandler2()); + ch.pipeline().addLast(eventExecutorGroup.next(), new TestChannelHandler2()); } }) .bind(0).awaitUninterruptibly(); diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index 72da584a64..132fda97c6 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -776,8 +776,8 @@ public class DefaultChannelPipelineTest { ChannelPipeline pipeline = newLocalChannel().pipeline(); pipeline.addLast(handler1); pipeline.channel().register().syncUninterruptibly(); - pipeline.addLast(group1, handler2); - pipeline.addLast(group2, handler3); + pipeline.addLast(group1.next(), handler2); + pipeline.addLast(group2.next(), handler3); pipeline.addLast(handler4); assertTrue(removedQueue.isEmpty()); @@ -809,7 +809,7 @@ public class DefaultChannelPipelineTest { final Promise promise = group1.next().newPromise(); final Exception exception = new RuntimeException(); ChannelPipeline pipeline = newLocalChannel().pipeline(); - pipeline.addLast(group1, new CheckExceptionHandler(exception, promise)); + pipeline.addLast(group1.next(), new CheckExceptionHandler(exception, promise)); pipeline.addFirst(new ChannelHandlerAdapter() { @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { @@ -838,7 +838,7 @@ public class DefaultChannelPipelineTest { throw exception; } }); - pipeline.addLast(group1, new CheckExceptionHandler(exception, promise)); + pipeline.addLast(group1.next(), new CheckExceptionHandler(exception, promise)); pipeline.channel().register().syncUninterruptibly(); pipeline.remove(handlerName); promise.syncUninterruptibly(); @@ -858,7 +858,7 @@ public class DefaultChannelPipelineTest { final Exception exceptionRemoved = new RuntimeException(); String handlerName = "foo"; ChannelPipeline pipeline = newLocalChannel().pipeline(); - pipeline.addLast(group1, new CheckExceptionHandler(exceptionAdded, promise)); + pipeline.addLast(group1.next(), new CheckExceptionHandler(exceptionAdded, promise)); pipeline.addFirst(handlerName, new ChannelHandlerAdapter() { @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { @@ -1043,7 +1043,7 @@ public class DefaultChannelPipelineTest { pipeline1.channel().register().syncUninterruptibly(); final CountDownLatch latch = new CountDownLatch(1); - pipeline1.addLast(eventExecutors, new ChannelInboundHandlerAdapter() { + pipeline1.addLast(eventExecutors.next(), new ChannelInboundHandlerAdapter() { @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // Just block one of the two threads. @@ -1071,9 +1071,10 @@ public class DefaultChannelPipelineTest { ChannelPipeline pipeline = newLocalChannel().pipeline(); ChannelPipeline pipeline2 = newLocalChannel().pipeline(); - pipeline.addLast(group, "h1", new ChannelInboundHandlerAdapter()); - pipeline.addLast(group, "h2", new ChannelInboundHandlerAdapter()); - pipeline2.addLast(group, "h3", new ChannelInboundHandlerAdapter()); + EventExecutor executor = group.next(); + pipeline.addLast(executor, "h1", new ChannelInboundHandlerAdapter()); + pipeline.addLast(executor, "h2", new ChannelInboundHandlerAdapter()); + pipeline2.addLast(group.next(), "h3", new ChannelInboundHandlerAdapter()); EventExecutor executor1 = pipeline.context("h1").executor(); EventExecutor executor2 = pipeline.context("h2").executor(); @@ -1090,10 +1091,9 @@ public class DefaultChannelPipelineTest { public void testNotPinExecutor() { EventExecutorGroup group = new DefaultEventExecutorGroup(2); ChannelPipeline pipeline = newLocalChannel().pipeline(); - pipeline.channel().config().setOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP, false); - pipeline.addLast(group, "h1", new ChannelInboundHandlerAdapter()); - pipeline.addLast(group, "h2", new ChannelInboundHandlerAdapter()); + pipeline.addLast(group.next(), "h1", new ChannelInboundHandlerAdapter()); + pipeline.addLast(group.next(), "h2", new ChannelInboundHandlerAdapter()); EventExecutor executor1 = pipeline.context("h1").executor(); EventExecutor executor2 = pipeline.context("h2").executor(); diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java index 844f4efaa7..a6311d5743 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java @@ -95,9 +95,9 @@ public class LocalTransportThreadModelTest { // With no EventExecutor specified, h1 will be always invoked by EventLoop 'l'. ch.pipeline().addLast(h1); // h2 will be always invoked by EventExecutor 'e1'. - ch.pipeline().addLast(e1, h2); + ch.pipeline().addLast(e1.next(), h2); // h3 will be always invoked by EventExecutor 'e2'. - ch.pipeline().addLast(e2, h3); + ch.pipeline().addLast(e2.next(), h3); ch.register().sync().channel().connect(localAddr).sync(); @@ -247,11 +247,11 @@ public class LocalTransportThreadModelTest { // inbound: int -> byte[4] -> int -> int -> byte[4] -> int -> /dev/null // outbound: int -> int -> byte[4] -> int -> int -> byte[4] -> /dev/null ch.pipeline().addLast(h1) - .addLast(e1, h2) - .addLast(e2, h3) - .addLast(e3, h4) - .addLast(e4, h5) - .addLast(e5, h6); + .addLast(e1.next(), h2) + .addLast(e2.next(), h3) + .addLast(e3.next(), h4) + .addLast(e4.next(), h5) + .addLast(e5.next(), h6); ch.register().sync().channel().connect(localAddr).sync(); diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java index 889465fde2..4567bbc9bc 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java @@ -137,12 +137,12 @@ public class LocalTransportThreadModelTest3 { if (!inbound) { ch.config().setAutoRead(false); } - ch.pipeline().addLast(e1, h1) - .addLast(e1, h2) - .addLast(e1, h3) - .addLast(e1, h4) - .addLast(e1, h5) - .addLast(e1, "recorder", h6); + ch.pipeline().addLast(e1.next(), h1) + .addLast(e1.next(), h2) + .addLast(e1.next(), h3) + .addLast(e1.next(), h4) + .addLast(e1.next(), h5) + .addLast(e1.next(), "recorder", h6); ch.register().sync().channel().connect(localAddr).sync(); @@ -166,7 +166,7 @@ public class LocalTransportThreadModelTest3 { } //EventForwardHandler forwardHandler = forwarders[random.nextInt(forwarders.length)]; ChannelHandler handler = ch.pipeline().removeFirst(); - ch.pipeline().addBefore(groups[random.nextInt(groups.length)], "recorder", + ch.pipeline().addBefore(groups[random.nextInt(groups.length)].next(), "recorder", UUID.randomUUID().toString(), handler); } }