From 061899f2a7b4fb1f6c1f3dfe723109e863c9a35d Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 5 Jul 2016 20:32:44 +0200 Subject: [PATCH] Allow to remove pinning of EventExecutor for EventExecutorGroup Motivation: We pinned the EventExecutor for a Channel in DefaultChannelPipeline. Which means if the user added multiple handlers with the same EventExecutorGroup to the ChannelPipeline it will use the same EventExecutor for all of these handlers. This may be unexpected and even not what the user wants. If the user want to use the same one for all of them it can be done by obtain an EventExecutor and pass the same instance to the add methods. Because of this we should allow to not pin. Modifications: Allow to disable pinning of EventExecutor for Channel based on EventExecutorGroup via ChannelOption. Result: Less confusing and more flexible usage of EventExecutorGroup when adding ChannelHandlers to the ChannelPipeline. --- .../java/io/netty/channel/ChannelOption.java | 3 ++ .../netty/channel/DefaultChannelConfig.java | 20 +++++++++- .../netty/channel/DefaultChannelPipeline.java | 5 ++- .../channel/DefaultChannelPipelineTest.java | 38 +++++++++++++++++++ 4 files changed, 64 insertions(+), 2 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/ChannelOption.java b/transport/src/main/java/io/netty/channel/ChannelOption.java index 9fefad5135..605b8e7463 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOption.java +++ b/transport/src/main/java/io/netty/channel/ChannelOption.java @@ -128,6 +128,9 @@ public class ChannelOption extends AbstractConstant> { public static final ChannelOption DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION = valueOf("DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION"); + public static final ChannelOption SINGLE_EVENTEXECUTOR_PER_GROUP = + valueOf("SINGLE_EVENTEXECUTOR_PER_GROUP"); + /** * Creates a new {@link ChannelOption} with the specified unique {@code name}. */ diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java b/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java index e1f4b22e6f..755ab789e2 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java @@ -30,6 +30,7 @@ import static io.netty.channel.ChannelOption.AUTO_READ; import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; import static io.netty.channel.ChannelOption.MAX_MESSAGES_PER_READ; import static io.netty.channel.ChannelOption.MESSAGE_SIZE_ESTIMATOR; +import static io.netty.channel.ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP; import static io.netty.channel.ChannelOption.RCVBUF_ALLOCATOR; import static io.netty.channel.ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK; import static io.netty.channel.ChannelOption.WRITE_BUFFER_LOW_WATER_MARK; @@ -77,6 +78,7 @@ public class DefaultChannelConfig implements ChannelConfig { private volatile int autoRead = 1; private volatile boolean autoClose = true; private volatile WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT; + private volatile boolean pinEventExecutor = true; public DefaultChannelConfig(Channel channel) { this(channel, new AdaptiveRecvByteBufAllocator()); @@ -94,7 +96,8 @@ public class DefaultChannelConfig implements ChannelConfig { null, CONNECT_TIMEOUT_MILLIS, MAX_MESSAGES_PER_READ, WRITE_SPIN_COUNT, ALLOCATOR, AUTO_READ, AUTO_CLOSE, RCVBUF_ALLOCATOR, WRITE_BUFFER_HIGH_WATER_MARK, - WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_WATER_MARK, MESSAGE_SIZE_ESTIMATOR); + WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_WATER_MARK, MESSAGE_SIZE_ESTIMATOR, + SINGLE_EVENTEXECUTOR_PER_GROUP); } protected Map, Object> getOptions( @@ -165,6 +168,9 @@ public class DefaultChannelConfig implements ChannelConfig { if (option == MESSAGE_SIZE_ESTIMATOR) { return (T) getMessageSizeEstimator(); } + if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) { + return (T) Boolean.valueOf(getPinEventExecutorPerGroup()); + } return null; } @@ -195,6 +201,8 @@ public class DefaultChannelConfig implements ChannelConfig { setWriteBufferWaterMark((WriteBufferWaterMark) value); } else if (option == MESSAGE_SIZE_ESTIMATOR) { setMessageSizeEstimator((MessageSizeEstimator) value); + } else if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) { + setPinEventExecutorPerGroup((Boolean) value); } else { return false; } @@ -430,4 +438,14 @@ public class DefaultChannelConfig implements ChannelConfig { msgSizeEstimator = estimator; return this; } + + private ChannelConfig setPinEventExecutorPerGroup(boolean pinEventExecutor) { + this.pinEventExecutor = pinEventExecutor; + return this; + } + + private boolean getPinEventExecutorPerGroup() { + return pinEventExecutor; + } + } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 0939676a3a..49c3077e3b 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -114,6 +114,10 @@ public class DefaultChannelPipeline implements ChannelPipeline { if (group == null) { return null; } + Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP); + if (pinEventExecutor != null && !pinEventExecutor) { + return group.next(); + } Map childExecutors = this.childExecutors; if (childExecutors == null) { // Use size of 4 as most people only use one extra EventExecutor. @@ -128,7 +132,6 @@ public class DefaultChannelPipeline implements ChannelPipeline { } return childExecutor; } - @Override public final Channel channel() { return channel; diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index 397b9d610c..1a4faa4401 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -932,6 +932,44 @@ public class DefaultChannelPipelineTest { } } + @Test + public void testPinExecutor() { + EventExecutorGroup group = new DefaultEventExecutorGroup(2); + ChannelPipeline pipeline = new LocalChannel().pipeline(); + ChannelPipeline pipeline2 = new LocalChannel().pipeline(); + + pipeline.addLast(group, "h1", new ChannelInboundHandlerAdapter()); + pipeline.addLast(group, "h2", new ChannelInboundHandlerAdapter()); + pipeline2.addLast(group, "h3", new ChannelInboundHandlerAdapter()); + + EventExecutor executor1 = pipeline.context("h1").executor(); + EventExecutor executor2 = pipeline.context("h2").executor(); + assertNotNull(executor1); + assertNotNull(executor2); + assertSame(executor1, executor2); + EventExecutor executor3 = pipeline2.context("h3").executor(); + assertNotNull(executor3); + assertNotSame(executor3, executor2); + group.shutdownGracefully(0, 0, TimeUnit.SECONDS); + } + + @Test + public void testNotPinExecutor() { + EventExecutorGroup group = new DefaultEventExecutorGroup(2); + ChannelPipeline pipeline = new LocalChannel().pipeline(); + pipeline.channel().config().setOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP, false); + + pipeline.addLast(group, "h1", new ChannelInboundHandlerAdapter()); + pipeline.addLast(group, "h2", new ChannelInboundHandlerAdapter()); + + EventExecutor executor1 = pipeline.context("h1").executor(); + EventExecutor executor2 = pipeline.context("h2").executor(); + assertNotNull(executor1); + assertNotNull(executor2); + assertNotSame(executor1, executor2); + group.shutdownGracefully(0, 0, TimeUnit.SECONDS); + } + private static final class TestTask implements Runnable { private final ChannelPipeline pipeline;