Allow to remove pinning of EventExecutor for EventExecutorGroup

Motivation:

We pinned the EventExecutor for a Channel in DefaultChannelPipeline. Which means if the user added multiple handlers with the same EventExecutorGroup to the ChannelPipeline it will use the same EventExecutor for all of these handlers. This may be unexpected and even not what the user wants. If the user want to use the same one for all of them it can be done by obtain an EventExecutor and pass the same instance to the add methods. Because of this we should allow to not pin.

Modifications:

Allow to disable pinning of EventExecutor for Channel based on EventExecutorGroup via ChannelOption.

Result:

Less confusing and more flexible usage of EventExecutorGroup when adding ChannelHandlers to the ChannelPipeline.
This commit is contained in:
Norman Maurer 2016-07-05 20:32:44 +02:00
parent 64bf167423
commit 061899f2a7
4 changed files with 64 additions and 2 deletions

View File

@ -128,6 +128,9 @@ public class ChannelOption<T> extends AbstractConstant<ChannelOption<T>> {
public static final ChannelOption<Boolean> DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION =
valueOf("DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION");
public static final ChannelOption<Boolean> SINGLE_EVENTEXECUTOR_PER_GROUP =
valueOf("SINGLE_EVENTEXECUTOR_PER_GROUP");
/**
* Creates a new {@link ChannelOption} with the specified unique {@code name}.
*/

View File

@ -30,6 +30,7 @@ import static io.netty.channel.ChannelOption.AUTO_READ;
import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
import static io.netty.channel.ChannelOption.MAX_MESSAGES_PER_READ;
import static io.netty.channel.ChannelOption.MESSAGE_SIZE_ESTIMATOR;
import static io.netty.channel.ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP;
import static io.netty.channel.ChannelOption.RCVBUF_ALLOCATOR;
import static io.netty.channel.ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK;
import static io.netty.channel.ChannelOption.WRITE_BUFFER_LOW_WATER_MARK;
@ -77,6 +78,7 @@ public class DefaultChannelConfig implements ChannelConfig {
private volatile int autoRead = 1;
private volatile boolean autoClose = true;
private volatile WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
private volatile boolean pinEventExecutor = true;
public DefaultChannelConfig(Channel channel) {
this(channel, new AdaptiveRecvByteBufAllocator());
@ -94,7 +96,8 @@ public class DefaultChannelConfig implements ChannelConfig {
null,
CONNECT_TIMEOUT_MILLIS, MAX_MESSAGES_PER_READ, WRITE_SPIN_COUNT,
ALLOCATOR, AUTO_READ, AUTO_CLOSE, RCVBUF_ALLOCATOR, WRITE_BUFFER_HIGH_WATER_MARK,
WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_WATER_MARK, MESSAGE_SIZE_ESTIMATOR);
WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_WATER_MARK, MESSAGE_SIZE_ESTIMATOR,
SINGLE_EVENTEXECUTOR_PER_GROUP);
}
protected Map<ChannelOption<?>, Object> getOptions(
@ -165,6 +168,9 @@ public class DefaultChannelConfig implements ChannelConfig {
if (option == MESSAGE_SIZE_ESTIMATOR) {
return (T) getMessageSizeEstimator();
}
if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) {
return (T) Boolean.valueOf(getPinEventExecutorPerGroup());
}
return null;
}
@ -195,6 +201,8 @@ public class DefaultChannelConfig implements ChannelConfig {
setWriteBufferWaterMark((WriteBufferWaterMark) value);
} else if (option == MESSAGE_SIZE_ESTIMATOR) {
setMessageSizeEstimator((MessageSizeEstimator) value);
} else if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) {
setPinEventExecutorPerGroup((Boolean) value);
} else {
return false;
}
@ -430,4 +438,14 @@ public class DefaultChannelConfig implements ChannelConfig {
msgSizeEstimator = estimator;
return this;
}
private ChannelConfig setPinEventExecutorPerGroup(boolean pinEventExecutor) {
this.pinEventExecutor = pinEventExecutor;
return this;
}
private boolean getPinEventExecutorPerGroup() {
return pinEventExecutor;
}
}

View File

@ -114,6 +114,10 @@ public class DefaultChannelPipeline implements ChannelPipeline {
if (group == null) {
return null;
}
Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
if (pinEventExecutor != null && !pinEventExecutor) {
return group.next();
}
Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
if (childExecutors == null) {
// Use size of 4 as most people only use one extra EventExecutor.
@ -128,7 +132,6 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
return childExecutor;
}
@Override
public final Channel channel() {
return channel;

View File

@ -932,6 +932,44 @@ public class DefaultChannelPipelineTest {
}
}
@Test
public void testPinExecutor() {
EventExecutorGroup group = new DefaultEventExecutorGroup(2);
ChannelPipeline pipeline = new LocalChannel().pipeline();
ChannelPipeline pipeline2 = new LocalChannel().pipeline();
pipeline.addLast(group, "h1", new ChannelInboundHandlerAdapter());
pipeline.addLast(group, "h2", new ChannelInboundHandlerAdapter());
pipeline2.addLast(group, "h3", new ChannelInboundHandlerAdapter());
EventExecutor executor1 = pipeline.context("h1").executor();
EventExecutor executor2 = pipeline.context("h2").executor();
assertNotNull(executor1);
assertNotNull(executor2);
assertSame(executor1, executor2);
EventExecutor executor3 = pipeline2.context("h3").executor();
assertNotNull(executor3);
assertNotSame(executor3, executor2);
group.shutdownGracefully(0, 0, TimeUnit.SECONDS);
}
@Test
public void testNotPinExecutor() {
EventExecutorGroup group = new DefaultEventExecutorGroup(2);
ChannelPipeline pipeline = new LocalChannel().pipeline();
pipeline.channel().config().setOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP, false);
pipeline.addLast(group, "h1", new ChannelInboundHandlerAdapter());
pipeline.addLast(group, "h2", new ChannelInboundHandlerAdapter());
EventExecutor executor1 = pipeline.context("h1").executor();
EventExecutor executor2 = pipeline.context("h2").executor();
assertNotNull(executor1);
assertNotNull(executor2);
assertNotSame(executor1, executor2);
group.shutdownGracefully(0, 0, TimeUnit.SECONDS);
}
private static final class TestTask implements Runnable {
private final ChannelPipeline pipeline;