Leave responsibility to choose EventExecutor to the user (#8746)

Motivation:

We should leave the responsibility to choose the EventExecutor for a ChannelHandler to the user for more flexibility and to keep things simple.

Modification:

- Change method signatures to take an EventExecutor and not an EventExecutorGroup
- Remove special ChannelOption that allowed to enable / disable EventExecutor pinning

Result:

Simpler and more flexible code.
This commit is contained in:
Norman Maurer 2019-01-23 07:44:32 +01:00 committed by GitHub
parent 7b92ff2500
commit 5cfb107822
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 70 additions and 114 deletions

View File

@ -132,13 +132,13 @@ public class SocketEchoTest extends AbstractSocketTest {
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel c) throws Exception {
c.pipeline().addLast(group, sh);
c.pipeline().addLast(group.next(), sh);
}
});
cb.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel c) throws Exception {
c.pipeline().addLast(group, ch);
c.pipeline().addLast(group.next(), ch);
}
});
} else {

View File

@ -159,7 +159,7 @@ public class SocketStartTlsTest extends AbstractSocketTest {
ChannelPipeline p = sch.pipeline();
p.addLast("logger", new LoggingHandler(LOG_LEVEL));
p.addLast(new LineBasedFrameDecoder(64), new StringDecoder(), new StringEncoder());
p.addLast(executor, sh);
p.addLast(executor.next(), sh);
}
});
@ -169,7 +169,7 @@ public class SocketStartTlsTest extends AbstractSocketTest {
ChannelPipeline p = sch.pipeline();
p.addLast("logger", new LoggingHandler(LOG_LEVEL));
p.addLast(new LineBasedFrameDecoder(64), new StringDecoder(), new StringEncoder());
p.addLast(executor, ch);
p.addLast(executor.next(), ch);
}
});

View File

@ -126,9 +126,6 @@ public class ChannelOption<T> extends AbstractConstant<ChannelOption<T>> {
public static final ChannelOption<Boolean> DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION =
valueOf("DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION");
public static final ChannelOption<Boolean> SINGLE_EVENTEXECUTOR_PER_GROUP =
valueOf("SINGLE_EVENTEXECUTOR_PER_GROUP");
/**
* Creates a new {@link ChannelOption} with the specified unique {@code name}.
*/

View File

@ -17,6 +17,7 @@ package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import java.net.SocketAddress;
@ -204,7 +205,7 @@ import java.util.NoSuchElementException;
* // a time-consuming task.
* // If your business logic is fully asynchronous or finished very quickly, you don't
* // need to specify a group.
* pipeline.addLast(group, "handler", new MyBusinessLogicHandler());
* pipeline.addLast(group.next(), "handler", new MyBusinessLogicHandler());
* </pre>
*
* <h3>Thread safety</h3>
@ -232,7 +233,7 @@ public interface ChannelPipeline
/**
* Inserts a {@link ChannelHandler} at the first position of this pipeline.
*
* @param group the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler}
* @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler}
* methods
* @param name the name of the handler to insert first
* @param handler the handler to insert first
@ -242,7 +243,7 @@ public interface ChannelPipeline
* @throws NullPointerException
* if the specified handler is {@code null}
*/
ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addFirst(EventExecutor executor, String name, ChannelHandler handler);
/**
* Appends a {@link ChannelHandler} at the last position of this pipeline.
@ -260,7 +261,7 @@ public interface ChannelPipeline
/**
* Appends a {@link ChannelHandler} at the last position of this pipeline.
*
* @param group the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler}
* @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler}
* methods
* @param name the name of the handler to append
* @param handler the handler to append
@ -270,7 +271,7 @@ public interface ChannelPipeline
* @throws NullPointerException
* if the specified handler is {@code null}
*/
ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addLast(EventExecutor executor, String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler} before an existing handler of this
@ -293,7 +294,7 @@ public interface ChannelPipeline
* Inserts a {@link ChannelHandler} before an existing handler of this
* pipeline.
*
* @param group the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler}
* @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler}
* methods
* @param baseName the name of the existing handler
* @param name the name of the handler to insert before
@ -306,7 +307,7 @@ public interface ChannelPipeline
* @throws NullPointerException
* if the specified baseName or handler is {@code null}
*/
ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addBefore(EventExecutor executor, String baseName, String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler} after an existing handler of this
@ -329,7 +330,7 @@ public interface ChannelPipeline
* Inserts a {@link ChannelHandler} after an existing handler of this
* pipeline.
*
* @param group the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler}
* @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler}
* methods
* @param baseName the name of the existing handler
* @param name the name of the handler to insert after
@ -342,7 +343,7 @@ public interface ChannelPipeline
* @throws NullPointerException
* if the specified baseName or handler is {@code null}
*/
ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(EventExecutor executor, String baseName, String name, ChannelHandler handler);
/**
* Inserts {@link ChannelHandler}s at the first position of this pipeline.
@ -355,12 +356,12 @@ public interface ChannelPipeline
/**
* Inserts {@link ChannelHandler}s at the first position of this pipeline.
*
* @param group the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler}s
* @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler}s
* methods.
* @param handlers the handlers to insert first
*
*/
ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);
ChannelPipeline addFirst(EventExecutor executor, ChannelHandler... handlers);
/**
* Inserts {@link ChannelHandler}s at the last position of this pipeline.
@ -373,12 +374,12 @@ public interface ChannelPipeline
/**
* Inserts {@link ChannelHandler}s at the last position of this pipeline.
*
* @param group the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler}s
* @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler}s
* methods.
* @param handlers the handlers to insert last
*
*/
ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
ChannelPipeline addLast(EventExecutor executor, ChannelHandler... handlers);
/**
* Removes the specified {@link ChannelHandler} from this pipeline.

View File

@ -30,7 +30,6 @@ import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
import static io.netty.channel.ChannelOption.MAX_MESSAGES_PER_READ;
import static io.netty.channel.ChannelOption.MESSAGE_SIZE_ESTIMATOR;
import static io.netty.channel.ChannelOption.RCVBUF_ALLOCATOR;
import static io.netty.channel.ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP;
import static io.netty.channel.ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK;
import static io.netty.channel.ChannelOption.WRITE_BUFFER_LOW_WATER_MARK;
import static io.netty.channel.ChannelOption.WRITE_BUFFER_WATER_MARK;
@ -81,8 +80,7 @@ public class DefaultChannelConfig implements ChannelConfig {
null,
CONNECT_TIMEOUT_MILLIS, MAX_MESSAGES_PER_READ, WRITE_SPIN_COUNT,
ALLOCATOR, AUTO_READ, AUTO_CLOSE, RCVBUF_ALLOCATOR, WRITE_BUFFER_HIGH_WATER_MARK,
WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_WATER_MARK, MESSAGE_SIZE_ESTIMATOR,
SINGLE_EVENTEXECUTOR_PER_GROUP);
WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_WATER_MARK, MESSAGE_SIZE_ESTIMATOR);
}
protected Map<ChannelOption<?>, Object> getOptions(
@ -153,9 +151,6 @@ public class DefaultChannelConfig implements ChannelConfig {
if (option == MESSAGE_SIZE_ESTIMATOR) {
return (T) getMessageSizeEstimator();
}
if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) {
return (T) Boolean.valueOf(getPinEventExecutorPerGroup());
}
return null;
}
@ -186,8 +181,6 @@ public class DefaultChannelConfig implements ChannelConfig {
setWriteBufferWaterMark((WriteBufferWaterMark) value);
} else if (option == MESSAGE_SIZE_ESTIMATOR) {
setMessageSizeEstimator((MessageSizeEstimator) value);
} else if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) {
setPinEventExecutorPerGroup((Boolean) value);
} else {
return false;
}
@ -426,14 +419,4 @@ public class DefaultChannelConfig implements ChannelConfig {
msgSizeEstimator = estimator;
return this;
}
private ChannelConfig setPinEventExecutorPerGroup(boolean pinEventExecutor) {
this.pinEventExecutor = pinEventExecutor;
return this;
}
private boolean getPinEventExecutorPerGroup() {
return pinEventExecutor;
}
}

View File

@ -19,7 +19,6 @@ import io.netty.channel.Channel.Unsafe;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.StringUtil;
@ -29,7 +28,6 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@ -68,7 +66,6 @@ public class DefaultChannelPipeline implements ChannelPipeline {
private final VoidChannelPromise voidPromise;
private final boolean touch = ResourceLeakDetector.isEnabled();
private Map<EventExecutorGroup, EventExecutor> childExecutors;
private volatile MessageSizeEstimator.Handle estimatorHandle;
protected DefaultChannelPipeline(Channel channel) {
@ -98,32 +95,10 @@ public class DefaultChannelPipeline implements ChannelPipeline {
return touch ? ReferenceCountUtil.touch(msg, next) : msg;
}
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
private AbstractChannelHandlerContext newContext(EventExecutor executor, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, executor, name, handler);
}
private EventExecutor childExecutor(EventExecutorGroup group) {
if (group == null) {
return null;
}
Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
if (pinEventExecutor != null && !pinEventExecutor) {
return group.next();
}
Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
if (childExecutors == null) {
// Use size of 4 as most people only use one extra EventExecutor.
childExecutors = this.childExecutors = new IdentityHashMap<>(4);
}
// Pin one of the child executors once and remember it so that the same child executor
// is used to fire events for the same channel.
EventExecutor childExecutor = childExecutors.get(group);
if (childExecutor == null) {
childExecutor = group.next();
childExecutors.put(group, childExecutor);
}
return childExecutor;
}
@Override
public final Channel channel() {
return channel;
@ -135,20 +110,20 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
public final ChannelPipeline addFirst(EventExecutor executor, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
name = filterName(name, handler);
newCtx = newContext(group, name, handler);
newCtx = newContext(executor, name, handler);
addFirst0(newCtx);
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
EventExecutor ctxExecutor = newCtx.executor();
if (!ctxExecutor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
ctxExecutor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
@ -175,19 +150,19 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
public final ChannelPipeline addLast(EventExecutor executor, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
newCtx = newContext(executor, filterName(name, handler), handler);
addLast0(newCtx);
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
EventExecutor ctxExecutor = newCtx.executor();
if (!ctxExecutor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
ctxExecutor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
@ -215,7 +190,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelPipeline addBefore(
EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
EventExecutor executor, String baseName, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
final AbstractChannelHandlerContext ctx;
synchronized (this) {
@ -223,14 +198,14 @@ public class DefaultChannelPipeline implements ChannelPipeline {
name = filterName(name, handler);
ctx = getContextOrDie(baseName);
newCtx = newContext(group, name, handler);
newCtx = newContext(executor, name, handler);
addBefore0(ctx, newCtx);
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
EventExecutor ctxExecutor = newCtx.executor();
if (!ctxExecutor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
ctxExecutor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
@ -265,7 +240,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelPipeline addAfter(
EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
EventExecutor executor, String baseName, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
final AbstractChannelHandlerContext ctx;
@ -274,14 +249,14 @@ public class DefaultChannelPipeline implements ChannelPipeline {
name = filterName(name, handler);
ctx = getContextOrDie(baseName);
newCtx = newContext(group, name, handler);
newCtx = newContext(executor, name, handler);
addAfter0(ctx, newCtx);
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
EventExecutor ctxExecutor = newCtx.executor();
if (!ctxExecutor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
ctxExecutor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
@ -311,7 +286,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {
public final ChannelPipeline addFirst(EventExecutor executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
@ -344,7 +319,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
public final ChannelPipeline addLast(EventExecutor executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}

View File

@ -49,7 +49,7 @@ public abstract class AbstractEventLoopTest {
@Override
public void initChannel(ServerSocketChannel ch) throws Exception {
ch.pipeline().addLast(new TestChannelHandler());
ch.pipeline().addLast(eventExecutorGroup, new TestChannelHandler2());
ch.pipeline().addLast(eventExecutorGroup.next(), new TestChannelHandler2());
}
})
.bind(0).awaitUninterruptibly();

View File

@ -776,8 +776,8 @@ public class DefaultChannelPipelineTest {
ChannelPipeline pipeline = newLocalChannel().pipeline();
pipeline.addLast(handler1);
pipeline.channel().register().syncUninterruptibly();
pipeline.addLast(group1, handler2);
pipeline.addLast(group2, handler3);
pipeline.addLast(group1.next(), handler2);
pipeline.addLast(group2.next(), handler3);
pipeline.addLast(handler4);
assertTrue(removedQueue.isEmpty());
@ -809,7 +809,7 @@ public class DefaultChannelPipelineTest {
final Promise<Void> promise = group1.next().newPromise();
final Exception exception = new RuntimeException();
ChannelPipeline pipeline = newLocalChannel().pipeline();
pipeline.addLast(group1, new CheckExceptionHandler(exception, promise));
pipeline.addLast(group1.next(), new CheckExceptionHandler(exception, promise));
pipeline.addFirst(new ChannelHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
@ -838,7 +838,7 @@ public class DefaultChannelPipelineTest {
throw exception;
}
});
pipeline.addLast(group1, new CheckExceptionHandler(exception, promise));
pipeline.addLast(group1.next(), new CheckExceptionHandler(exception, promise));
pipeline.channel().register().syncUninterruptibly();
pipeline.remove(handlerName);
promise.syncUninterruptibly();
@ -858,7 +858,7 @@ public class DefaultChannelPipelineTest {
final Exception exceptionRemoved = new RuntimeException();
String handlerName = "foo";
ChannelPipeline pipeline = newLocalChannel().pipeline();
pipeline.addLast(group1, new CheckExceptionHandler(exceptionAdded, promise));
pipeline.addLast(group1.next(), new CheckExceptionHandler(exceptionAdded, promise));
pipeline.addFirst(handlerName, new ChannelHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
@ -1043,7 +1043,7 @@ public class DefaultChannelPipelineTest {
pipeline1.channel().register().syncUninterruptibly();
final CountDownLatch latch = new CountDownLatch(1);
pipeline1.addLast(eventExecutors, new ChannelInboundHandlerAdapter() {
pipeline1.addLast(eventExecutors.next(), new ChannelInboundHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// Just block one of the two threads.
@ -1071,9 +1071,10 @@ public class DefaultChannelPipelineTest {
ChannelPipeline pipeline = newLocalChannel().pipeline();
ChannelPipeline pipeline2 = newLocalChannel().pipeline();
pipeline.addLast(group, "h1", new ChannelInboundHandlerAdapter());
pipeline.addLast(group, "h2", new ChannelInboundHandlerAdapter());
pipeline2.addLast(group, "h3", new ChannelInboundHandlerAdapter());
EventExecutor executor = group.next();
pipeline.addLast(executor, "h1", new ChannelInboundHandlerAdapter());
pipeline.addLast(executor, "h2", new ChannelInboundHandlerAdapter());
pipeline2.addLast(group.next(), "h3", new ChannelInboundHandlerAdapter());
EventExecutor executor1 = pipeline.context("h1").executor();
EventExecutor executor2 = pipeline.context("h2").executor();
@ -1090,10 +1091,9 @@ public class DefaultChannelPipelineTest {
public void testNotPinExecutor() {
EventExecutorGroup group = new DefaultEventExecutorGroup(2);
ChannelPipeline pipeline = newLocalChannel().pipeline();
pipeline.channel().config().setOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP, false);
pipeline.addLast(group, "h1", new ChannelInboundHandlerAdapter());
pipeline.addLast(group, "h2", new ChannelInboundHandlerAdapter());
pipeline.addLast(group.next(), "h1", new ChannelInboundHandlerAdapter());
pipeline.addLast(group.next(), "h2", new ChannelInboundHandlerAdapter());
EventExecutor executor1 = pipeline.context("h1").executor();
EventExecutor executor2 = pipeline.context("h2").executor();

View File

@ -95,9 +95,9 @@ public class LocalTransportThreadModelTest {
// With no EventExecutor specified, h1 will be always invoked by EventLoop 'l'.
ch.pipeline().addLast(h1);
// h2 will be always invoked by EventExecutor 'e1'.
ch.pipeline().addLast(e1, h2);
ch.pipeline().addLast(e1.next(), h2);
// h3 will be always invoked by EventExecutor 'e2'.
ch.pipeline().addLast(e2, h3);
ch.pipeline().addLast(e2.next(), h3);
ch.register().sync().channel().connect(localAddr).sync();
@ -247,11 +247,11 @@ public class LocalTransportThreadModelTest {
// inbound: int -> byte[4] -> int -> int -> byte[4] -> int -> /dev/null
// outbound: int -> int -> byte[4] -> int -> int -> byte[4] -> /dev/null
ch.pipeline().addLast(h1)
.addLast(e1, h2)
.addLast(e2, h3)
.addLast(e3, h4)
.addLast(e4, h5)
.addLast(e5, h6);
.addLast(e1.next(), h2)
.addLast(e2.next(), h3)
.addLast(e3.next(), h4)
.addLast(e4.next(), h5)
.addLast(e5.next(), h6);
ch.register().sync().channel().connect(localAddr).sync();

View File

@ -137,12 +137,12 @@ public class LocalTransportThreadModelTest3 {
if (!inbound) {
ch.config().setAutoRead(false);
}
ch.pipeline().addLast(e1, h1)
.addLast(e1, h2)
.addLast(e1, h3)
.addLast(e1, h4)
.addLast(e1, h5)
.addLast(e1, "recorder", h6);
ch.pipeline().addLast(e1.next(), h1)
.addLast(e1.next(), h2)
.addLast(e1.next(), h3)
.addLast(e1.next(), h4)
.addLast(e1.next(), h5)
.addLast(e1.next(), "recorder", h6);
ch.register().sync().channel().connect(localAddr).sync();
@ -166,7 +166,7 @@ public class LocalTransportThreadModelTest3 {
}
//EventForwardHandler forwardHandler = forwarders[random.nextInt(forwarders.length)];
ChannelHandler handler = ch.pipeline().removeFirst();
ch.pipeline().addBefore(groups[random.nextInt(groups.length)], "recorder",
ch.pipeline().addBefore(groups[random.nextInt(groups.length)].next(), "recorder",
UUID.randomUUID().toString(), handler);
}
}