diff --git a/transport/src/main/java/io/netty/channel/ChannelOption.java b/transport/src/main/java/io/netty/channel/ChannelOption.java index 9fefad5135..605b8e7463 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOption.java +++ b/transport/src/main/java/io/netty/channel/ChannelOption.java @@ -128,6 +128,9 @@ public class ChannelOption extends AbstractConstant> { public static final ChannelOption DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION = valueOf("DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION"); + public static final ChannelOption SINGLE_EVENTEXECUTOR_PER_GROUP = + valueOf("SINGLE_EVENTEXECUTOR_PER_GROUP"); + /** * Creates a new {@link ChannelOption} with the specified unique {@code name}. */ diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java b/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java index e1f4b22e6f..755ab789e2 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java @@ -30,6 +30,7 @@ import static io.netty.channel.ChannelOption.AUTO_READ; import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; import static io.netty.channel.ChannelOption.MAX_MESSAGES_PER_READ; import static io.netty.channel.ChannelOption.MESSAGE_SIZE_ESTIMATOR; +import static io.netty.channel.ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP; import static io.netty.channel.ChannelOption.RCVBUF_ALLOCATOR; import static io.netty.channel.ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK; import static io.netty.channel.ChannelOption.WRITE_BUFFER_LOW_WATER_MARK; @@ -77,6 +78,7 @@ public class DefaultChannelConfig implements ChannelConfig { private volatile int autoRead = 1; private volatile boolean autoClose = true; private volatile WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT; + private volatile boolean pinEventExecutor = true; public DefaultChannelConfig(Channel channel) { this(channel, new AdaptiveRecvByteBufAllocator()); @@ -94,7 +96,8 @@ public class DefaultChannelConfig implements ChannelConfig { null, CONNECT_TIMEOUT_MILLIS, MAX_MESSAGES_PER_READ, WRITE_SPIN_COUNT, ALLOCATOR, AUTO_READ, AUTO_CLOSE, RCVBUF_ALLOCATOR, WRITE_BUFFER_HIGH_WATER_MARK, - WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_WATER_MARK, MESSAGE_SIZE_ESTIMATOR); + WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_WATER_MARK, MESSAGE_SIZE_ESTIMATOR, + SINGLE_EVENTEXECUTOR_PER_GROUP); } protected Map, Object> getOptions( @@ -165,6 +168,9 @@ public class DefaultChannelConfig implements ChannelConfig { if (option == MESSAGE_SIZE_ESTIMATOR) { return (T) getMessageSizeEstimator(); } + if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) { + return (T) Boolean.valueOf(getPinEventExecutorPerGroup()); + } return null; } @@ -195,6 +201,8 @@ public class DefaultChannelConfig implements ChannelConfig { setWriteBufferWaterMark((WriteBufferWaterMark) value); } else if (option == MESSAGE_SIZE_ESTIMATOR) { setMessageSizeEstimator((MessageSizeEstimator) value); + } else if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) { + setPinEventExecutorPerGroup((Boolean) value); } else { return false; } @@ -430,4 +438,14 @@ public class DefaultChannelConfig implements ChannelConfig { msgSizeEstimator = estimator; return this; } + + private ChannelConfig setPinEventExecutorPerGroup(boolean pinEventExecutor) { + this.pinEventExecutor = pinEventExecutor; + return this; + } + + private boolean getPinEventExecutorPerGroup() { + return pinEventExecutor; + } + } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 0939676a3a..49c3077e3b 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -114,6 +114,10 @@ public class DefaultChannelPipeline implements ChannelPipeline { if (group == null) { return null; } + Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP); + if (pinEventExecutor != null && !pinEventExecutor) { + return group.next(); + } Map childExecutors = this.childExecutors; if (childExecutors == null) { // Use size of 4 as most people only use one extra EventExecutor. @@ -128,7 +132,6 @@ public class DefaultChannelPipeline implements ChannelPipeline { } return childExecutor; } - @Override public final Channel channel() { return channel; diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index 397b9d610c..1a4faa4401 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -932,6 +932,44 @@ public class DefaultChannelPipelineTest { } } + @Test + public void testPinExecutor() { + EventExecutorGroup group = new DefaultEventExecutorGroup(2); + ChannelPipeline pipeline = new LocalChannel().pipeline(); + ChannelPipeline pipeline2 = new LocalChannel().pipeline(); + + pipeline.addLast(group, "h1", new ChannelInboundHandlerAdapter()); + pipeline.addLast(group, "h2", new ChannelInboundHandlerAdapter()); + pipeline2.addLast(group, "h3", new ChannelInboundHandlerAdapter()); + + EventExecutor executor1 = pipeline.context("h1").executor(); + EventExecutor executor2 = pipeline.context("h2").executor(); + assertNotNull(executor1); + assertNotNull(executor2); + assertSame(executor1, executor2); + EventExecutor executor3 = pipeline2.context("h3").executor(); + assertNotNull(executor3); + assertNotSame(executor3, executor2); + group.shutdownGracefully(0, 0, TimeUnit.SECONDS); + } + + @Test + public void testNotPinExecutor() { + EventExecutorGroup group = new DefaultEventExecutorGroup(2); + ChannelPipeline pipeline = new LocalChannel().pipeline(); + pipeline.channel().config().setOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP, false); + + pipeline.addLast(group, "h1", new ChannelInboundHandlerAdapter()); + pipeline.addLast(group, "h2", new ChannelInboundHandlerAdapter()); + + EventExecutor executor1 = pipeline.context("h1").executor(); + EventExecutor executor2 = pipeline.context("h2").executor(); + assertNotNull(executor1); + assertNotNull(executor2); + assertNotSame(executor1, executor2); + group.shutdownGracefully(0, 0, TimeUnit.SECONDS); + } + private static final class TestTask implements Runnable { private final ChannelPipeline pipeline;