Remove ability to specify a custom EventExecutor when adding handlers… (#8778)

Motiviation:

In the past we allowed to use different EventExecutors for different ChannelHandlers in the ChannelPipeline. This introduced a lot of complexity while not providing much gain. Also it made the pipeline racy in terms of adding / remove handlers in some situations. This feature is not really used in the wild and can be easily archived by offloading heavy logic to an Executor by the user itself.

Modifications:

- Remove the ability to provide custom EventExecutor when adding handlers to the pipeline.
- Remove testcode that is not needed any more
- Ensure a handler is correctly visible in the pipeline when asked for it by the user while not be used until the EventLoop runs. This ensures correct ordering and visibility.
- Correctly remove ChannelHandlers from pipeline when scheduling of handlerAdded(...) callbacks fail.

Result:

Remove races in DefaultChannelPipeline and simplify implementation of AbstractChannelHandlerContext.
This commit is contained in:
Norman Maurer 2019-01-30 13:34:20 +01:00
parent d74682f123
commit 8e72071d76
13 changed files with 755 additions and 2444 deletions

View File

@ -22,13 +22,8 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
@ -42,29 +37,17 @@ public class SocketEchoTest extends AbstractSocketTest {
private static final Random random = new Random();
static final byte[] data = new byte[1048576];
private static EventExecutorGroup group;
static {
random.nextBytes(data);
}
@BeforeClass
public static void createGroup() {
group = new DefaultEventExecutorGroup(2);
}
@AfterClass
public static void destroyGroup() throws Exception {
group.shutdownGracefully().sync();
}
@Test(timeout = 30000)
public void testSimpleEcho() throws Throwable {
run();
}
public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, false, false, true);
testSimpleEcho0(sb, cb, false, true);
}
@Test(timeout = 30000)
@ -73,25 +56,7 @@ public class SocketEchoTest extends AbstractSocketTest {
}
public void testSimpleEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, false, false, false);
}
@Test//(timeout = 30000)
public void testSimpleEchoWithAdditionalExecutor() throws Throwable {
run();
}
public void testSimpleEchoWithAdditionalExecutor(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, true, false, true);
}
@Test//(timeout = 30000)
public void testSimpleEchoWithAdditionalExecutorNotAutoRead() throws Throwable {
run();
}
public void testSimpleEchoWithAdditionalExecutorNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, true, false, false);
testSimpleEcho0(sb, cb, false, false);
}
@Test//(timeout = 30000)
@ -100,7 +65,7 @@ public class SocketEchoTest extends AbstractSocketTest {
}
public void testSimpleEchoWithVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, false, true, true);
testSimpleEcho0(sb, cb, true, true);
}
@Test//(timeout = 30000)
@ -109,48 +74,24 @@ public class SocketEchoTest extends AbstractSocketTest {
}
public void testSimpleEchoWithVoidPromiseNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, false, true, false);
}
@Test(timeout = 30000)
public void testSimpleEchoWithAdditionalExecutorAndVoidPromise() throws Throwable {
run();
}
public void testSimpleEchoWithAdditionalExecutorAndVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, true, true, true);
testSimpleEcho0(sb, cb, true, false);
}
private static void testSimpleEcho0(
ServerBootstrap sb, Bootstrap cb, boolean additionalExecutor, boolean voidPromise, boolean autoRead)
ServerBootstrap sb, Bootstrap cb, boolean voidPromise, boolean autoRead)
throws Throwable {
final EchoHandler sh = new EchoHandler(autoRead);
final EchoHandler ch = new EchoHandler(autoRead);
if (additionalExecutor) {
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel c) throws Exception {
c.pipeline().addLast(group.next(), sh);
}
});
cb.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel c) throws Exception {
c.pipeline().addLast(group.next(), ch);
}
});
} else {
sb.childHandler(sh);
sb.handler(new ChannelInboundHandlerAdapter() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
}
});
cb.handler(ch);
}
sb.childHandler(sh);
sb.handler(new ChannelInboundHandlerAdapter() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
}
});
cb.handler(ch);
sb.childOption(ChannelOption.AUTO_READ, autoRead);
cb.option(ChannelOption.AUTO_READ, autoRead);

View File

@ -35,13 +35,9 @@ import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -66,7 +62,6 @@ public class SocketStartTlsTest extends AbstractSocketTest {
private static final LogLevel LOG_LEVEL = LogLevel.TRACE;
private static final File CERT_FILE;
private static final File KEY_FILE;
private static EventExecutorGroup executor;
static {
SelfSignedCertificate ssc;
@ -106,16 +101,6 @@ public class SocketStartTlsTest extends AbstractSocketTest {
return params;
}
@BeforeClass
public static void createExecutor() {
executor = new DefaultEventExecutorGroup(2);
}
@AfterClass
public static void shutdownExecutor() throws Exception {
executor.shutdownGracefully().sync();
}
private final SslContext serverCtx;
private final SslContext clientCtx;
@ -146,7 +131,6 @@ public class SocketStartTlsTest extends AbstractSocketTest {
sb.childOption(ChannelOption.AUTO_READ, autoRead);
cb.option(ChannelOption.AUTO_READ, autoRead);
final EventExecutorGroup executor = SocketStartTlsTest.executor;
SSLEngine sse = serverCtx.newEngine(PooledByteBufAllocator.DEFAULT);
SSLEngine cse = clientCtx.newEngine(PooledByteBufAllocator.DEFAULT);
@ -159,7 +143,7 @@ public class SocketStartTlsTest extends AbstractSocketTest {
ChannelPipeline p = sch.pipeline();
p.addLast("logger", new LoggingHandler(LOG_LEVEL));
p.addLast(new LineBasedFrameDecoder(64), new StringDecoder(), new StringEncoder());
p.addLast(executor.next(), sh);
p.addLast(sh);
}
});
@ -169,7 +153,7 @@ public class SocketStartTlsTest extends AbstractSocketTest {
ChannelPipeline p = sch.pipeline();
p.addLast("logger", new LoggingHandler(LOG_LEVEL));
p.addLast(new LineBasedFrameDecoder(64), new StringDecoder(), new StringEncoder());
p.addLast(executor.next(), ch);
p.addLast(ch);
}
});

View File

@ -60,6 +60,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private final ChannelId id;
private final Unsafe unsafe;
private final ChannelPipeline pipeline;
private final ChannelFuture succeedFuture;
private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
private final CloseFuture closeFuture = new CloseFuture(this);
@ -83,6 +84,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
this.parent = parent;
this.eventLoop = validateEventLoop(eventLoop);
id = newId();
succeedFuture = new SucceededChannelFuture(this, eventLoop);
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
@ -97,6 +99,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
this.parent = parent;
this.eventLoop = validateEventLoop(eventLoop);
this.id = id;
succeedFuture = new SucceededChannelFuture(this, eventLoop);
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
@ -325,22 +328,22 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
@Override
public ChannelPromise newPromise() {
return pipeline.newPromise();
return new DefaultChannelPromise(this, eventLoop);
}
@Override
public ChannelProgressivePromise newProgressivePromise() {
return pipeline.newProgressivePromise();
return new DefaultChannelProgressivePromise(this, eventLoop);
}
@Override
public ChannelFuture newSucceededFuture() {
return pipeline.newSucceededFuture();
return succeedFuture;
}
@Override
public ChannelFuture newFailedFuture(Throwable cause) {
return pipeline.newFailedFuture(cause);
return new FailedChannelFuture(this, eventLoop, cause);
}
@Override

View File

@ -24,7 +24,6 @@ import io.netty.util.ReferenceCountUtil;
import io.netty.util.ResourceLeakHint;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.OrderedEventExecutor;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.PromiseNotificationUtil;
import io.netty.util.internal.ThrowableUtil;
@ -45,24 +44,20 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
AbstractChannelHandlerContext next;
AbstractChannelHandlerContext prev;
private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
/**
* {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
*/
private static final int ADD_PENDING = 1;
/**
* {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
*/
private static final int ADD_COMPLETE = 2;
private static final int ADD_COMPLETE = 1;
/**
* {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
*/
private static final int REMOVE_COMPLETE = 3;
private static final int REMOVE_COMPLETE = 2;
/**
* Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
* nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
@ -107,11 +102,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
private final DefaultChannelPipeline pipeline;
private final String name;
private final boolean ordered;
// Will be set to null if no child executor should be used, otherwise it will be set to the
// child executor.
final EventExecutor executor;
private ChannelFuture succeededFuture;
// Lazily instantiated tasks used to trigger events to a handler with different executor.
@ -120,14 +111,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
private volatile int handlerState = INIT;
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, String name,
Class<? extends ChannelHandler> handlerClass) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.executionMask = mask(handlerClass);
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
/**
@ -248,15 +236,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
return channel().config().getAllocator();
}
@Override
public EventExecutor executor() {
if (executor == null) {
return channel().eventLoop();
} else {
return executor;
}
}
@Override
public String name() {
return name;
@ -264,126 +243,105 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
@Override
public ChannelHandlerContext fireChannelRegistered() {
invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
EventExecutor executor = executor();
if (executor.inEventLoop()) {
findAndInvokeChannelRegistered();
} else {
executor.execute(this::findAndInvokeChannelRegistered);
}
return this;
}
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(next::invokeChannelRegistered);
}
private void findAndInvokeChannelRegistered() {
findContextInbound(MASK_CHANNEL_REGISTERED).invokeChannelRegistered();
}
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
final void invokeChannelRegistered() {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override
public ChannelHandlerContext fireChannelUnregistered() {
invokeChannelUnregistered(findContextInbound(MASK_CHANNEL_UNREGISTERED));
EventExecutor executor = executor();
if (executor.inEventLoop()) {
findAndInvokeChannelUnregistered();
} else {
executor.execute(this::findAndInvokeChannelUnregistered);
}
return this;
}
static void invokeChannelUnregistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelUnregistered();
} else {
executor.execute(next::invokeChannelUnregistered);
}
private void findAndInvokeChannelUnregistered() {
findContextInbound(MASK_CHANNEL_UNREGISTERED).invokeChannelUnregistered();
}
private void invokeChannelUnregistered() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelUnregistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelUnregistered();
final void invokeChannelUnregistered() {
try {
((ChannelInboundHandler) handler()).channelUnregistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override
public ChannelHandlerContext fireChannelActive() {
invokeChannelActive(findContextInbound(MASK_CHANNEL_ACTIVE));
EventExecutor executor = executor();
if (executor.inEventLoop()) {
findAndInvokeChannelActive();
} else {
executor.execute(this::findAndInvokeChannelActive);
}
return this;
}
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(next::invokeChannelActive);
}
private void findAndInvokeChannelActive() {
findContextInbound(MASK_CHANNEL_ACTIVE).invokeChannelActive();
}
private void invokeChannelActive() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelActive();
final void invokeChannelActive() {
try {
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override
public ChannelHandlerContext fireChannelInactive() {
invokeChannelInactive(findContextInbound(MASK_CHANNEL_INACTIVE));
EventExecutor executor = executor();
if (executor.inEventLoop()) {
findAndInvokeChannelInactive();
} else {
executor.execute(this::findAndInvokeChannelInactive);
}
return this;
}
static void invokeChannelInactive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelInactive();
} else {
executor.execute(next::invokeChannelInactive);
}
private void findAndInvokeChannelInactive() {
findContextInbound(MASK_CHANNEL_INACTIVE).invokeChannelInactive();
}
private void invokeChannelInactive() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelInactive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelInactive();
final void invokeChannelInactive() {
try {
((ChannelInboundHandler) handler()).channelInactive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override
public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
invokeExceptionCaught(findContextInbound(MASK_EXCEPTION_CAUGHT), cause);
return this;
}
static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
ObjectUtil.checkNotNull(cause, "cause");
EventExecutor executor = next.executor();
EventExecutor executor = executor();
if (executor.inEventLoop()) {
next.invokeExceptionCaught(cause);
findAndInvokeExceptionCaught(cause);
} else {
try {
executor.execute(() -> next.invokeExceptionCaught(cause));
executor.execute(() -> findAndInvokeExceptionCaught(cause));
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to submit an exceptionCaught() event.", t);
@ -391,146 +349,137 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
}
}
}
return this;
}
private void invokeExceptionCaught(final Throwable cause) {
if (invokeHandler()) {
try {
handler().exceptionCaught(this, cause);
} catch (Throwable error) {
if (logger.isDebugEnabled()) {
logger.debug(
private void findAndInvokeExceptionCaught(Throwable cause) {
findContextInbound(MASK_EXCEPTION_CAUGHT).invokeExceptionCaught(cause);
}
final void invokeExceptionCaught(final Throwable cause) {
try {
handler().exceptionCaught(this, cause);
} catch (Throwable error) {
if (logger.isDebugEnabled()) {
logger.debug(
"An exception {}" +
"was thrown by a user handler's exceptionCaught() " +
"method while handling the following exception:",
"was thrown by a user handler's exceptionCaught() " +
"method while handling the following exception:",
ThrowableUtil.stackTraceToString(error), cause);
} else if (logger.isWarnEnabled()) {
logger.warn(
} else if (logger.isWarnEnabled()) {
logger.warn(
"An exception '{}' [enable DEBUG level for full stacktrace] " +
"was thrown by a user handler's exceptionCaught() " +
"method while handling the following exception:", error, cause);
}
"was thrown by a user handler's exceptionCaught() " +
"method while handling the following exception:", error, cause);
}
} else {
fireExceptionCaught(cause);
}
}
@Override
public ChannelHandlerContext fireUserEventTriggered(final Object event) {
invokeUserEventTriggered(findContextInbound(MASK_USER_EVENT_TRIGGERED), event);
public ChannelHandlerContext fireUserEventTriggered(Object event) {
ObjectUtil.checkNotNull(event, "event");
EventExecutor executor = executor();
if (executor.inEventLoop()) {
findAndInvokeUserEventTriggered(event);
} else {
executor.execute(() -> findAndInvokeUserEventTriggered(event));
}
return this;
}
static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) {
ObjectUtil.checkNotNull(event, "event");
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeUserEventTriggered(event);
} else {
executor.execute(() -> next.invokeUserEventTriggered(event));
}
private void findAndInvokeUserEventTriggered(Object event) {
findContextInbound(MASK_USER_EVENT_TRIGGERED).invokeUserEventTriggered(event);
}
private void invokeUserEventTriggered(Object event) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).userEventTriggered(this, event);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireUserEventTriggered(event);
final void invokeUserEventTriggered(Object event) {
try {
((ChannelInboundHandler) handler()).userEventTriggered(this, event);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
ObjectUtil.checkNotNull(msg, "msg");
EventExecutor executor = executor();
if (executor.inEventLoop()) {
findAndInvokeChannelRead(msg);
} else {
try {
executor.execute(() -> findAndInvokeChannelRead(msg));
} catch (Throwable cause) {
ReferenceCountUtil.release(msg);
throw cause;
}
}
return this;
}
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(() -> next.invokeChannelRead(m));
}
private void findAndInvokeChannelRead(Object msg) {
findContextInbound(MASK_CHANNEL_READ).invokeChannelRead(msg);
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
final void invokeChannelRead(Object msg) {
final Object m = pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), this);
try {
((ChannelInboundHandler) handler()).channelRead(this, m);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override
public ChannelHandlerContext fireChannelReadComplete() {
invokeChannelReadComplete(findContextInbound(MASK_CHANNEL_READ_COMPLETE));
return this;
}
static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
EventExecutor executor = executor();
if (executor.inEventLoop()) {
next.invokeChannelReadComplete();
findAndInvokeChannelReadComplete();
} else {
Tasks tasks = next.invokeTasks;
Tasks tasks = invokeTasks;
if (tasks == null) {
next.invokeTasks = tasks = new Tasks(next);
invokeTasks = tasks = new Tasks(this);
}
executor.execute(tasks.invokeChannelReadCompleteTask);
}
return this;
}
private void invokeChannelReadComplete() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelReadComplete(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelReadComplete();
private void findAndInvokeChannelReadComplete() {
findContextInbound(MASK_CHANNEL_READ_COMPLETE).invokeChannelReadComplete();
}
final void invokeChannelReadComplete() {
try {
((ChannelInboundHandler) handler()).channelReadComplete(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@Override
public ChannelHandlerContext fireChannelWritabilityChanged() {
invokeChannelWritabilityChanged(findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED));
return this;
}
static void invokeChannelWritabilityChanged(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
EventExecutor executor = executor();
if (executor.inEventLoop()) {
next.invokeChannelWritabilityChanged();
findAndInvokeChannelWritabilityChanged();
} else {
Tasks tasks = next.invokeTasks;
Tasks tasks = invokeTasks;
if (tasks == null) {
next.invokeTasks = tasks = new Tasks(next);
invokeTasks = tasks = new Tasks(this);
}
executor.execute(tasks.invokeChannelWritableStateChangedTask);
}
return this;
}
private void invokeChannelWritabilityChanged() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelWritabilityChanged(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelWritabilityChanged();
private void findAndInvokeChannelWritabilityChanged() {
findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED).invokeChannelWritabilityChanged();
}
final void invokeChannelWritabilityChanged() {
try {
((ChannelInboundHandler) handler()).channelWritabilityChanged(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@ -579,25 +528,24 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
EventExecutor executor = next.executor();
EventExecutor executor = executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
findAndInvokeBind(localAddress, promise);
} else {
safeExecute(executor, () -> next.invokeBind(localAddress, promise), promise, null);
safeExecute(executor, () -> findAndInvokeBind(localAddress, promise), promise, null);
}
return promise;
}
private void findAndInvokeBind(SocketAddress localAddress, ChannelPromise promise) {
findContextOutbound(MASK_BIND).invokeBind(localAddress, promise);
}
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
@ -618,25 +566,24 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
EventExecutor executor = next.executor();
EventExecutor executor = executor();
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
findAndInvokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, () -> next.invokeConnect(remoteAddress, localAddress, promise), promise, null);
safeExecute(executor, () -> findAndInvokeConnect(remoteAddress, localAddress, promise), promise, null);
}
return promise;
}
private void findAndInvokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
findContextOutbound(MASK_CONNECT).invokeConnect(remoteAddress, localAddress, promise);
}
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
connect(remoteAddress, localAddress, promise);
try {
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
@ -647,37 +594,38 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound(MASK_DISCONNECT);
EventExecutor executor = next.executor();
EventExecutor executor = executor();
if (executor.inEventLoop()) {
// Translate disconnect to close if the channel has no notion of disconnect-reconnect.
// So far, UDP/IP is the only transport that has such behavior.
if (!channel().metadata().hasDisconnect()) {
next.invokeClose(promise);
findAndInvokeClose(promise);
} else {
next.invokeDisconnect(promise);
findAndInvokeDisconnect(promise);
}
} else {
safeExecute(executor, () -> {
// Translate disconnect to close if the channel has no notion of disconnect-reconnect.
// So far, UDP/IP is the only transport that has such behavior.
if (!channel().metadata().hasDisconnect()) {
next.invokeClose(promise);
findAndInvokeClose(promise);
} else {
next.invokeDisconnect(promise);
findAndInvokeDisconnect(promise);
}
}, promise, null);
}
return promise;
}
private void findAndInvokeDisconnect(ChannelPromise promise) {
findContextOutbound(MASK_DISCONNECT).invokeDisconnect(promise);
}
private void invokeDisconnect(ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).disconnect(this, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
disconnect(promise);
try {
((ChannelOutboundHandler) handler()).disconnect(this, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
@ -688,26 +636,24 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound(MASK_CLOSE);
EventExecutor executor = next.executor();
EventExecutor executor = executor();
if (executor.inEventLoop()) {
next.invokeClose(promise);
findAndInvokeClose(promise);
} else {
safeExecute(executor, () -> next.invokeClose(promise), promise, null);
safeExecute(executor, () -> findAndInvokeClose(promise), promise, null);
}
return promise;
}
private void findAndInvokeClose(ChannelPromise promise) {
findContextOutbound(MASK_CLOSE).invokeClose(promise);
}
private void invokeClose(ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).close(this, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
close(promise);
try {
((ChannelOutboundHandler) handler()).close(this, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
@ -718,26 +664,24 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound(MASK_REGISTER);
EventExecutor executor = next.executor();
EventExecutor executor = executor();
if (executor.inEventLoop()) {
next.invokeRegister(promise);
findAndInvokeRegister(promise);
} else {
safeExecute(executor, () -> next.invokeRegister(promise), promise, null);
safeExecute(executor, () -> findAndInvokeRegister(promise), promise, null);
}
return promise;
}
private void findAndInvokeRegister(ChannelPromise promise) {
findContextOutbound(MASK_REGISTER).invokeRegister(promise);
}
private void invokeRegister(ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).register(this, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
register(promise);
try {
((ChannelOutboundHandler) handler()).register(this, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
@ -748,55 +692,51 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound(MASK_DEREGISTER);
EventExecutor executor = next.executor();
EventExecutor executor = executor();
if (executor.inEventLoop()) {
next.invokeDeregister(promise);
findAndInvokeDeregister(promise);
} else {
safeExecute(executor, () -> next.invokeDeregister(promise), promise, null);
safeExecute(executor, () -> findAndInvokeDeregister(promise), promise, null);
}
return promise;
}
private void findAndInvokeDeregister(ChannelPromise promise) {
findContextOutbound(MASK_DEREGISTER).invokeDeregister(promise);
}
private void invokeDeregister(ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).deregister(this, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
deregister(promise);
try {
((ChannelOutboundHandler) handler()).deregister(this, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
@Override
public ChannelHandlerContext read() {
final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
EventExecutor executor = next.executor();
EventExecutor executor = executor();
if (executor.inEventLoop()) {
next.invokeRead();
findAndInvokeRead();
} else {
Tasks tasks = next.invokeTasks;
Tasks tasks = invokeTasks;
if (tasks == null) {
next.invokeTasks = tasks = new Tasks(next);
invokeTasks = tasks = new Tasks(this);
}
executor.execute(tasks.invokeReadTask);
}
return this;
}
private void findAndInvokeRead() {
findContextOutbound(MASK_READ).invokeRead();
}
private void invokeRead() {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).read(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
read();
try {
((ChannelOutboundHandler) handler()).read(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
@ -813,16 +753,9 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
}
private void invokeWrite(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
} else {
write(msg, promise);
}
}
private void invokeWrite0(Object msg, ChannelPromise promise) {
final Object m = pipeline.touch(msg, this);
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
((ChannelOutboundHandler) handler()).write(this, m, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
@ -830,14 +763,13 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
@Override
public ChannelHandlerContext flush() {
final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH);
EventExecutor executor = next.executor();
EventExecutor executor = executor();
if (executor.inEventLoop()) {
next.invokeFlush();
findAndInvokeFlush();
} else {
Tasks tasks = next.invokeTasks;
Tasks tasks = invokeTasks;
if (tasks == null) {
next.invokeTasks = tasks = new Tasks(next);
invokeTasks = tasks = new Tasks(this);
}
safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null);
}
@ -845,15 +777,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
return this;
}
private void invokeFlush() {
if (invokeHandler()) {
invokeFlush0();
} else {
flush();
}
private void findAndInvokeFlush() {
findContextOutbound(MASK_FLUSH).invokeFlush();
}
private void invokeFlush0() {
private void invokeFlush() {
try {
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
@ -868,12 +796,8 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
}
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
invokeWrite(msg, promise);
invokeFlush();
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
@ -889,23 +813,23 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
throw e;
}
final AbstractChannelHandlerContext next = findContextOutbound(flush ? (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
EventExecutor executor = executor();
if (executor.inEventLoop()) {
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
if (flush) {
next.invokeWriteAndFlush(m, promise);
next.invokeWriteAndFlush(msg, promise);
} else {
next.invokeWrite(m, promise);
next.invokeWrite(msg, promise);
}
} else {
final AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
task = WriteAndFlushTask.newInstance(this, msg, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
task = WriteTask.newInstance(this, msg, promise);
}
if (!safeExecute(executor, task, promise, m)) {
if (!safeExecute(executor, task, promise, msg)) {
// We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes
// and put it back in the Recycler for re-use later.
//
@ -1041,28 +965,15 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
return channel().voidPromise();
}
final void setRemoved() {
private void setRemoved() {
handlerState = REMOVE_COMPLETE;
}
final boolean setAddComplete() {
for (;;) {
int oldState = handlerState;
if (oldState == REMOVE_COMPLETE) {
return false;
}
// Ensure we never update when the handlerState is REMOVE_COMPLETE already.
// oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
// exposing ordering guarantees.
if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
return true;
}
}
}
final void setAddPending() {
boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved().
// Ensure we never update when the handlerState is REMOVE_COMPLETE already.
// oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
// exposing ordering guarantees.
return HANDLER_STATE_UPDATER.getAndSet(this, ADD_COMPLETE) != REMOVE_COMPLETE;
}
final void callHandlerAdded() throws Exception {
@ -1085,20 +996,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
}
}
/**
* Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called
* yet. If not return {@code false} and if called or could not detect return {@code true}.
*
* If this method returns {@code false} we will not invoke the {@link ChannelHandler} but just forward the event.
* This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list
* but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}.
*/
private boolean invokeHandler() {
// Store in local variable to reduce volatile reads.
int handlerState = this.handlerState;
return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
}
@Override
public boolean isRemoved() {
return handlerState == REMOVE_COMPLETE;
@ -1140,15 +1037,15 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')';
}
private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);
// Assuming a 64-bit JVM, 16 bytes object header, 3 reference fields and one int field, plus alignment
private static final int WRITE_TASK_OVERHEAD =
SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 48);
abstract static class AbstractWriteTask implements Runnable {
private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);
// Assuming a 64-bit JVM, 16 bytes object header, 3 reference fields and one int field, plus alignment
private static final int WRITE_TASK_OVERHEAD =
SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 48);
private final Recycler.Handle<AbstractWriteTask> handle;
private AbstractChannelHandlerContext ctx;
private Object msg;
@ -1174,11 +1071,13 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
}
}
protected abstract AbstractChannelHandlerContext findContext(AbstractChannelHandlerContext ctx);
@Override
public final void run() {
try {
decrementPendingOutboundBytes();
write(ctx, msg, promise);
AbstractChannelHandlerContext next = findContext(ctx);
write(next, msg, promise);
} finally {
recycle();
}
@ -1227,6 +1126,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
return task;
}
@Override
protected AbstractChannelHandlerContext findContext(AbstractChannelHandlerContext ctx) {
return ctx.findContextOutbound(MASK_WRITE);
}
private WriteTask(Recycler.Handle<WriteTask> handle) {
super(handle);
}
@ -1252,6 +1156,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
super(handle);
}
@Override
protected AbstractChannelHandlerContext findContext(AbstractChannelHandlerContext ctx) {
return ctx.findContextOutbound(MASK_WRITE | MASK_FLUSH);
}
@Override
public void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
super.write(ctx, msg, promise);
@ -1265,11 +1174,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
private final Runnable invokeChannelWritableStateChangedTask;
private final Runnable invokeFlushTask;
Tasks(AbstractChannelHandlerContext next) {
invokeChannelReadCompleteTask = next::invokeChannelReadComplete;
invokeReadTask = next::invokeRead;
invokeChannelWritableStateChangedTask = next::invokeChannelWritabilityChanged;
invokeFlushTask = next::invokeFlush;
Tasks(AbstractChannelHandlerContext ctx) {
invokeChannelReadCompleteTask = ctx::findAndInvokeChannelReadComplete;
invokeReadTask = ctx::findAndInvokeRead;
invokeChannelWritableStateChangedTask = ctx::invokeChannelWritabilityChanged;
invokeFlushTask = ctx::findAndInvokeFlush;
}
}
}

View File

@ -16,22 +16,19 @@
package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
/**
* A list of {@link ChannelHandler}s which handles or intercepts inbound events and outbound operations of a
* {@link Channel}. {@link ChannelPipeline} implements an advanced form of the
* {@link Channel}. {@link ChannelPipeline} implements an advanced form of the
* <a href="http://www.oracle.com/technetwork/java/interceptingfilter-142169.html">Intercepting Filter</a> pattern
* to give a user full control over how an event is handled and how the {@link ChannelHandler}s in a pipeline
* interact with each other.
@ -179,7 +176,7 @@ import java.util.NoSuchElementException;
* <h3>Building a pipeline</h3>
* <p>
* A user is supposed to have one or more {@link ChannelHandler}s in a pipeline to receive I/O events (e.g. read) and
* to request I/O operations (e.g. write and close). For example, a typical server will have the following handlers
* to request I/O operations (e.g. write and close). For example, a typical server will have the following handlers
* in each channel's pipeline, but your mileage may vary depending on the complexity and characteristics of the
* protocol and business logic:
*
@ -192,7 +189,6 @@ import java.util.NoSuchElementException;
* and it could be represented as shown in the following example:
*
* <pre>
* static final {@link EventExecutorGroup} group = new {@link DefaultEventExecutorGroup}(16);
* ...
*
* {@link ChannelPipeline} pipeline = ch.pipeline();
@ -200,12 +196,9 @@ import java.util.NoSuchElementException;
* pipeline.addLast("decoder", new MyProtocolDecoder());
* pipeline.addLast("encoder", new MyProtocolEncoder());
*
* // Tell the pipeline to run MyBusinessLogicHandler's event handler methods
* // in a different thread than an I/O thread so that the I/O thread is not blocked by
* // a time-consuming task.
* // If your business logic is fully asynchronous or finished very quickly, you don't
* // need to specify a group.
* pipeline.addLast(group.next(), "handler", new MyBusinessLogicHandler());
* // If your business logic does block or take a lot of time you should offload the work to an extra
* // {@link java.util.concurrent.Executor} to ensure you don't block the {@link EventLoop}.
* pipeline.addLast("handler", new MyBusinessLogicHandler());
* </pre>
*
* <h3>Thread safety</h3>
@ -215,7 +208,7 @@ import java.util.NoSuchElementException;
* after the exchange.
*/
public interface ChannelPipeline
extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {
extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Map.Entry<String, ChannelHandler>> {
/**
* Inserts a {@link ChannelHandler} at the first position of this pipeline.
@ -230,21 +223,6 @@ public interface ChannelPipeline
*/
ChannelPipeline addFirst(String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler} at the first position of this pipeline.
*
* @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler}
* methods
* @param name the name of the handler to insert first
* @param handler the handler to insert first
*
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified handler is {@code null}
*/
ChannelPipeline addFirst(EventExecutor executor, String name, ChannelHandler handler);
/**
* Appends a {@link ChannelHandler} at the last position of this pipeline.
*
@ -258,21 +236,6 @@ public interface ChannelPipeline
*/
ChannelPipeline addLast(String name, ChannelHandler handler);
/**
* Appends a {@link ChannelHandler} at the last position of this pipeline.
*
* @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler}
* methods
* @param name the name of the handler to append
* @param handler the handler to append
*
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified handler is {@code null}
*/
ChannelPipeline addLast(EventExecutor executor, String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler} before an existing handler of this
* pipeline.
@ -290,25 +253,6 @@ public interface ChannelPipeline
*/
ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler} before an existing handler of this
* pipeline.
*
* @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler}
* methods
* @param baseName the name of the existing handler
* @param name the name of the handler to insert before
* @param handler the handler to insert before
*
* @throws NoSuchElementException
* if there's no such entry with the specified {@code baseName}
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified baseName or handler is {@code null}
*/
ChannelPipeline addBefore(EventExecutor executor, String baseName, String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler} after an existing handler of this
* pipeline.
@ -326,25 +270,6 @@ public interface ChannelPipeline
*/
ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler} after an existing handler of this
* pipeline.
*
* @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler}
* methods
* @param baseName the name of the existing handler
* @param name the name of the handler to insert after
* @param handler the handler to insert after
*
* @throws NoSuchElementException
* if there's no such entry with the specified {@code baseName}
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified baseName or handler is {@code null}
*/
ChannelPipeline addAfter(EventExecutor executor, String baseName, String name, ChannelHandler handler);
/**
* Inserts {@link ChannelHandler}s at the first position of this pipeline.
*
@ -353,16 +278,6 @@ public interface ChannelPipeline
*/
ChannelPipeline addFirst(ChannelHandler... handlers);
/**
* Inserts {@link ChannelHandler}s at the first position of this pipeline.
*
* @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler}s
* methods.
* @param handlers the handlers to insert first
*
*/
ChannelPipeline addFirst(EventExecutor executor, ChannelHandler... handlers);
/**
* Inserts {@link ChannelHandler}s at the last position of this pipeline.
*
@ -371,16 +286,6 @@ public interface ChannelPipeline
*/
ChannelPipeline addLast(ChannelHandler... handlers);
/**
* Inserts {@link ChannelHandler}s at the last position of this pipeline.
*
* @param executor the {@link EventExecutor} which will be used to execute the {@link ChannelHandler}s
* methods.
* @param handlers the handlers to insert last
*
*/
ChannelPipeline addLast(EventExecutor executor, ChannelHandler... handlers);
/**
* Removes the specified {@link ChannelHandler} from this pipeline.
*
@ -624,4 +529,9 @@ public interface ChannelPipeline
@Override
ChannelPipeline flush();
/**
* Returns the {@link EventExecutor} which is used by all {@link ChannelHandler}s in the pipeline.
*/
EventExecutor executor();
}

View File

@ -23,8 +23,8 @@ final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
private final ChannelHandler handler;
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, ObjectUtil.checkNotNull(handler, "handler").getClass());
DefaultChannelPipeline pipeline, String name, ChannelHandler handler) {
super(pipeline, name, ObjectUtil.checkNotNull(handler, "handler").getClass());
this.handler = handler;
}
@ -32,4 +32,9 @@ final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
public ChannelHandler handler() {
return handler;
}
@Override
public EventExecutor executor() {
return pipeline().executor();
}
}

View File

@ -30,44 +30,6 @@ import static org.junit.Assert.*;
public abstract class AbstractEventLoopTest {
/**
* Test for https://github.com/netty/netty/issues/803
*/
@Test
public void testReregister() {
EventLoopGroup group = newEventLoopGroup();
final EventExecutorGroup eventExecutorGroup = new DefaultEventExecutorGroup(2);
try {
ServerBootstrap bootstrap = new ServerBootstrap();
ChannelFuture future = bootstrap.channel(newChannel()).group(group)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
}
}).handler(new ChannelInitializer<ServerSocketChannel>() {
@Override
public void initChannel(ServerSocketChannel ch) throws Exception {
ch.pipeline().addLast(new TestChannelHandler());
ch.pipeline().addLast(eventExecutorGroup.next(), new TestChannelHandler2());
}
})
.bind(0).awaitUninterruptibly();
EventExecutor executor = future.channel().pipeline().context(TestChannelHandler2.class).executor();
EventExecutor executor1 = future.channel().pipeline().context(TestChannelHandler.class).executor();
future.channel().deregister().awaitUninterruptibly();
Channel channel = future.channel().register().awaitUninterruptibly().channel();
EventExecutor executorNew = channel.pipeline().context(TestChannelHandler.class).executor();
assertSame(executor1, executorNew);
assertSame(executor, future.channel().pipeline().context(TestChannelHandler2.class).executor());
} finally {
group.shutdownGracefully();
eventExecutorGroup.shutdownGracefully();
}
}
@Test(timeout = 5000)
public void testShutdownGracefullyNoQuietPeriod() throws Exception {
EventLoopGroup loop = newEventLoopGroup();
@ -86,13 +48,6 @@ public abstract class AbstractEventLoopTest {
assertTrue(loop.isTerminated());
}
private static final class TestChannelHandler extends ChannelDuplexHandler { }
private static final class TestChannelHandler2 extends ChannelDuplexHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { }
}
protected abstract EventLoopGroup newEventLoopGroup();
protected abstract Class<? extends ServerSocketChannel> newChannel();
}

View File

@ -22,26 +22,14 @@ import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalHandler;
import io.netty.channel.local.LocalServerChannel;
import io.netty.util.concurrent.AbstractEventExecutor;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ScheduledFuture;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.nio.channels.ClosedChannelException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -49,7 +37,6 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
public class ChannelInitializerTest {
@ -258,186 +245,6 @@ public class ChannelInitializerTest {
}
}
@SuppressWarnings("deprecation")
@Test(timeout = 10000)
public void testChannelInitializerEventExecutor() throws Throwable {
final AtomicInteger invokeCount = new AtomicInteger();
final AtomicInteger completeCount = new AtomicInteger();
final AtomicReference<Throwable> errorRef = new AtomicReference<>();
LocalAddress addr = new LocalAddress("test");
final EventExecutor executor = new AbstractEventExecutor() {
private final ScheduledExecutorService execService = Executors.newSingleThreadScheduledExecutor();
@Override
public boolean inEventLoop(Thread thread) {
return false;
}
@Override
public boolean isShuttingDown() {
return execService.isShutdown();
}
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
shutdown();
return newSucceededFuture(null);
}
@Override
public Future<?> terminationFuture() {
return newFailedFuture(new UnsupportedOperationException());
}
@Override
public void shutdown() {
execService.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
return execService.shutdownNow();
}
@Override
public boolean isShutdown() {
return execService.isShutdown();
}
@Override
public boolean isTerminated() {
return execService.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return execService.awaitTermination(timeout, unit);
}
@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return execService.invokeAll(tasks);
}
@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return execService.invokeAll(tasks, timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return execService.invokeAny(tasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return execService.invokeAny(tasks, timeout, unit);
}
@Override
public void execute(Runnable command) {
execService.execute(command);
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command, long initialDelay, long period, TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command, long initialDelay, long delay, TimeUnit unit) {
throw new UnsupportedOperationException();
}
};
final CountDownLatch latch = new CountDownLatch(1);
ServerBootstrap serverBootstrap = new ServerBootstrap()
.channel(LocalServerChannel.class)
.group(group)
.localAddress(addr)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
protected void initChannel(LocalChannel ch) {
ch.pipeline().addLast(executor, new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
invokeCount.incrementAndGet();
ChannelHandlerContext ctx = ch.pipeline().context(this);
assertNotNull(ctx);
ch.pipeline().addAfter(ctx.executor(),
ctx.name(), null, new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// just drop on the floor.
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
latch.countDown();
}
});
completeCount.incrementAndGet();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (cause instanceof AssertionError) {
errorRef.set(cause);
}
}
});
}
});
Channel server = serverBootstrap.bind().sync().channel();
Bootstrap clientBootstrap = new Bootstrap()
.channel(LocalChannel.class)
.group(group)
.remoteAddress(addr)
.handler(new ChannelInboundHandlerAdapter());
Channel client = clientBootstrap.connect().sync().channel();
client.writeAndFlush("Hello World").sync();
client.close().sync();
server.close().sync();
client.closeFuture().sync();
server.closeFuture().sync();
// Wait until the handler is removed from the pipeline and so no more events are handled by it.
latch.await();
assertEquals(1, invokeCount.get());
assertEquals(invokeCount.get(), completeCount.get());
Throwable cause = errorRef.get();
if (cause != null) {
throw cause;
}
executor.shutdown();
assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
}
private static void closeChannel(Channel c) {
if (c != null) {
c.close().syncUninterruptibly();

View File

@ -19,17 +19,11 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.RejectedExecutionHandlers;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import org.junit.Test;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import static io.netty.buffer.Unpooled.*;
import static org.hamcrest.Matchers.*;
@ -388,87 +382,6 @@ public class ChannelOutboundBufferTest {
safeClose(ch);
}
@Test(timeout = 5000)
public void testWriteTaskRejected() throws Exception {
final SingleThreadEventExecutor executor = new SingleThreadEventExecutor(
new DefaultThreadFactory("executorPool"),
1, RejectedExecutionHandlers.reject()) {
@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return super.newTaskQueue(1);
}
};
final CountDownLatch handlerAddedLatch = new CountDownLatch(1);
final CountDownLatch handlerRemovedLatch = new CountDownLatch(1);
EmbeddedChannel ch = new EmbeddedChannel();
ch.pipeline().addLast(executor, "handler", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
promise.setFailure(new AssertionError("Should not be called"));
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
handlerAddedLatch.countDown();
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
handlerRemovedLatch.countDown();
}
});
// Lets wait until we are sure the handler was added.
handlerAddedLatch.await();
final CountDownLatch executeLatch = new CountDownLatch(1);
final CountDownLatch runLatch = new CountDownLatch(1);
executor.execute(() -> {
try {
runLatch.countDown();
executeLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
runLatch.await();
executor.execute(() -> {
// Will not be executed but ensure the pending count is 1.
});
assertEquals(1, executor.pendingTasks());
assertEquals(0, ch.unsafe().outboundBuffer().totalPendingWriteBytes());
ByteBuf buffer = buffer(128).writeZero(128);
ChannelFuture future = ch.write(buffer);
ch.runPendingTasks();
assertTrue(future.cause() instanceof RejectedExecutionException);
assertEquals(0, buffer.refCnt());
// In case of rejected task we should not have anything pending.
assertEquals(0, ch.unsafe().outboundBuffer().totalPendingWriteBytes());
executeLatch.countDown();
while (executor.pendingTasks() != 0) {
// Wait until there is no more pending task left.
Thread.sleep(10);
}
ch.pipeline().remove("handler");
// Ensure we do not try to shutdown the executor before we handled everything for the Channel. Otherwise
// the Executor may reject when the Channel tries to add a task to it.
handlerRemovedLatch.await();
safeClose(ch);
executor.shutdownGracefully();
}
private static void safeClose(EmbeddedChannel ch) {
ch.finish();
for (;;) {

View File

@ -32,14 +32,11 @@ import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.AbstractEventExecutor;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.concurrent.UnorderedThreadPoolEventExecutor;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Test;
@ -51,21 +48,17 @@ import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
@ -672,37 +665,6 @@ public class DefaultChannelPipelineTest {
assertSame(exception, error.get());
}
@Test
public void testChannelUnregistrationWithCustomExecutor() throws Exception {
final CountDownLatch channelLatch = new CountDownLatch(1);
final CountDownLatch handlerLatch = new CountDownLatch(1);
ChannelPipeline pipeline = newLocalChannel().pipeline();
pipeline.addLast(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new WrapperExecutor(),
new ChannelInboundHandlerAdapter() {
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
channelLatch.countDown();
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
handlerLatch.countDown();
}
});
}
});
Channel channel = pipeline.channel();
channel.register().sync();
channel.close();
channel.deregister();
assertTrue(channelLatch.await(2, TimeUnit.SECONDS));
assertTrue(handlerLatch.await(2, TimeUnit.SECONDS));
}
@Test(timeout = 3000)
public void testAddHandlerBeforeRegisteredThenRemove() {
final EventLoop loop = group.next();
@ -740,129 +702,6 @@ public class DefaultChannelPipelineTest {
pipeline.channel().close().syncUninterruptibly();
}
@Test(timeout = 3000)
public void testHandlerAddedAndRemovedCalledInCorrectOrder() throws Throwable {
final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1);
final EventExecutorGroup group2 = new DefaultEventExecutorGroup(1);
try {
BlockingQueue<CheckOrderHandler> addedQueue = new LinkedBlockingQueue<>();
BlockingQueue<CheckOrderHandler> removedQueue = new LinkedBlockingQueue<>();
CheckOrderHandler handler1 = new CheckOrderHandler(addedQueue, removedQueue);
CheckOrderHandler handler2 = new CheckOrderHandler(addedQueue, removedQueue);
CheckOrderHandler handler3 = new CheckOrderHandler(addedQueue, removedQueue);
CheckOrderHandler handler4 = new CheckOrderHandler(addedQueue, removedQueue);
ChannelPipeline pipeline = newLocalChannel().pipeline();
pipeline.addLast(handler1);
pipeline.channel().register().syncUninterruptibly();
pipeline.addLast(group1.next(), handler2);
pipeline.addLast(group2.next(), handler3);
pipeline.addLast(handler4);
assertTrue(removedQueue.isEmpty());
pipeline.channel().close().syncUninterruptibly();
assertHandler(addedQueue.take(), handler1);
// Depending on timing this can be handler2 or handler3 as these use different EventExecutorGroups.
assertHandler(addedQueue.take(), handler2, handler3, handler4);
assertHandler(addedQueue.take(), handler2, handler3, handler4);
assertHandler(addedQueue.take(), handler2, handler3, handler4);
assertTrue(addedQueue.isEmpty());
assertHandler(removedQueue.take(), handler4);
assertHandler(removedQueue.take(), handler3);
assertHandler(removedQueue.take(), handler2);
assertHandler(removedQueue.take(), handler1);
assertTrue(removedQueue.isEmpty());
} finally {
group1.shutdownGracefully();
group2.shutdownGracefully();
}
}
@Test(timeout = 3000)
public void testHandlerAddedExceptionFromChildHandlerIsPropagated() {
final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1);
try {
final Promise<Void> promise = group1.next().newPromise();
final Exception exception = new RuntimeException();
ChannelPipeline pipeline = newLocalChannel().pipeline();
pipeline.addLast(group1.next(), new CheckExceptionHandler(exception, promise));
pipeline.addFirst(new ChannelHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
throw exception;
}
});
pipeline.channel().register();
promise.syncUninterruptibly();
pipeline.channel().close().syncUninterruptibly();
} finally {
group1.shutdownGracefully();
}
}
@Test(timeout = 3000)
public void testHandlerRemovedExceptionFromChildHandlerIsPropagated() {
final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1);
try {
final Promise<Void> promise = group1.next().newPromise();
String handlerName = "foo";
final Exception exception = new RuntimeException();
ChannelPipeline pipeline = newLocalChannel().pipeline();
pipeline.addLast(handlerName, new ChannelHandlerAdapter() {
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
throw exception;
}
});
pipeline.addLast(group1.next(), new CheckExceptionHandler(exception, promise));
pipeline.channel().register().syncUninterruptibly();
pipeline.remove(handlerName);
promise.syncUninterruptibly();
pipeline.channel().close().syncUninterruptibly();
} finally {
group1.shutdownGracefully();
}
}
@Test(timeout = 3000)
public void testHandlerAddedThrowsAndRemovedThrowsException() throws InterruptedException {
final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1);
try {
final CountDownLatch latch = new CountDownLatch(1);
final Promise<Void> promise = group1.next().newPromise();
final Exception exceptionAdded = new RuntimeException();
final Exception exceptionRemoved = new RuntimeException();
String handlerName = "foo";
ChannelPipeline pipeline = newLocalChannel().pipeline();
pipeline.addLast(group1.next(), new CheckExceptionHandler(exceptionAdded, promise));
pipeline.addFirst(handlerName, new ChannelHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
throw exceptionAdded;
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// Execute this later so we are sure the exception is handled first.
ctx.executor().execute(latch::countDown);
throw exceptionRemoved;
}
});
pipeline.register().syncUninterruptibly();
latch.await();
assertNull(pipeline.context(handlerName));
promise.syncUninterruptibly();
pipeline.channel().close().syncUninterruptibly();
} finally {
group1.shutdownGracefully();
}
}
@Test(timeout = 2000)
public void testAddRemoveHandlerCalled() throws Throwable {
ChannelPipeline pipeline = newLocalChannel().pipeline();
@ -1006,76 +845,6 @@ public class DefaultChannelPipelineTest {
pipeline.addBefore("test", null, newHandler());
}
@Test(timeout = 3000)
public void testUnorderedEventExecutor() throws Throwable {
EventExecutorGroup eventExecutors = new UnorderedThreadPoolEventExecutor(2);
EventLoopGroup defaultGroup = new MultithreadEventLoopGroup(1, LocalHandler.newFactory());
try {
EventLoop eventLoop1 = defaultGroup.next();
ChannelPipeline pipeline1 = new LocalChannel(eventLoop1).pipeline();
pipeline1.channel().register().syncUninterruptibly();
final CountDownLatch latch = new CountDownLatch(1);
pipeline1.addLast(eventExecutors.next(), new ChannelInboundHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// Just block one of the two threads.
LockSupport.park();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
latch.countDown();
}
});
// Trigger an event, as we use UnorderedEventExecutor userEventTriggered should be called even when
// handlerAdded(...) blocks.
pipeline1.fireUserEventTriggered("");
latch.await();
} finally {
defaultGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).syncUninterruptibly();
eventExecutors.shutdownGracefully(0, 0, TimeUnit.SECONDS).syncUninterruptibly();
}
}
@Test
public void testPinExecutor() {
EventExecutorGroup group = new DefaultEventExecutorGroup(2);
ChannelPipeline pipeline = newLocalChannel().pipeline();
ChannelPipeline pipeline2 = newLocalChannel().pipeline();
EventExecutor executor = group.next();
pipeline.addLast(executor, "h1", new ChannelInboundHandlerAdapter());
pipeline.addLast(executor, "h2", new ChannelInboundHandlerAdapter());
pipeline2.addLast(group.next(), "h3", new ChannelInboundHandlerAdapter());
EventExecutor executor1 = pipeline.context("h1").executor();
EventExecutor executor2 = pipeline.context("h2").executor();
assertNotNull(executor1);
assertNotNull(executor2);
assertSame(executor1, executor2);
EventExecutor executor3 = pipeline2.context("h3").executor();
assertNotNull(executor3);
assertNotSame(executor3, executor2);
group.shutdownGracefully(0, 0, TimeUnit.SECONDS);
}
@Test
public void testNotPinExecutor() {
EventExecutorGroup group = new DefaultEventExecutorGroup(2);
ChannelPipeline pipeline = newLocalChannel().pipeline();
pipeline.addLast(group.next(), "h1", new ChannelInboundHandlerAdapter());
pipeline.addLast(group.next(), "h2", new ChannelInboundHandlerAdapter());
EventExecutor executor1 = pipeline.context("h1").executor();
EventExecutor executor2 = pipeline.context("h2").executor();
assertNotNull(executor1);
assertNotNull(executor2);
assertNotSame(executor1, executor2);
group.shutdownGracefully(0, 0, TimeUnit.SECONDS);
}
@Test(timeout = 3000)
public void testVoidPromiseNotify() throws Throwable {
EventLoopGroup defaultGroup = new MultithreadEventLoopGroup(1, LocalHandler.newFactory());
@ -1832,13 +1601,21 @@ public class DefaultChannelPipelineTest {
}
private static void verifyContextNumber(ChannelPipeline pipeline, int expectedNumber) {
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) pipeline.firstContext();
int handlerNumber = 0;
while (ctx != ((DefaultChannelPipeline) pipeline).tail) {
handlerNumber++;
ctx = ctx.next;
}
assertEquals(expectedNumber, handlerNumber);
assertEquals(expectedNumber, pipeline.names().size());
assertEquals(expectedNumber, pipeline.toMap().size());
pipeline.executor().submit(new Runnable() {
@Override
public void run() {
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) pipeline.firstContext();
int handlerNumber = 0;
while (ctx != ((DefaultChannelPipeline) pipeline).tail) {
handlerNumber++;
ctx = ctx.next;
}
assertEquals(expectedNumber, handlerNumber);
}
}).syncUninterruptibly();
}
private static ChannelHandler[] newHandlers(int num) {

View File

@ -1,597 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.local;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.EventExecutorGroup;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import java.util.HashSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
public class LocalTransportThreadModelTest {
private static EventLoopGroup group;
private static LocalAddress localAddr;
@BeforeClass
public static void init() {
// Configure a test server
group = new MultithreadEventLoopGroup(LocalHandler.newFactory());
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// Discard
ReferenceCountUtil.release(msg);
}
});
}
});
localAddr = (LocalAddress) sb.bind(LocalAddress.ANY).syncUninterruptibly().channel().localAddress();
}
@AfterClass
public static void destroy() throws Exception {
group.shutdownGracefully().sync();
}
@Test(timeout = 30000)
@Ignore("regression test")
public void testStagedExecutionMultiple() throws Throwable {
for (int i = 0; i < 10; i ++) {
testStagedExecution();
}
}
@Test(timeout = 5000)
public void testStagedExecution() throws Throwable {
EventLoopGroup l = new MultithreadEventLoopGroup(4, new DefaultThreadFactory("l"),
LocalHandler.newFactory());
EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1"));
EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2"));
ThreadNameAuditor h1 = new ThreadNameAuditor();
ThreadNameAuditor h2 = new ThreadNameAuditor();
ThreadNameAuditor h3 = new ThreadNameAuditor(true);
Channel ch = new LocalChannel(l.next());
// With no EventExecutor specified, h1 will be always invoked by EventLoop 'l'.
ch.pipeline().addLast(h1);
// h2 will be always invoked by EventExecutor 'e1'.
ch.pipeline().addLast(e1.next(), h2);
// h3 will be always invoked by EventExecutor 'e2'.
ch.pipeline().addLast(e2.next(), h3);
ch.register().sync().channel().connect(localAddr).sync();
// Fire inbound events from all possible starting points.
ch.pipeline().fireChannelRead("1");
ch.pipeline().context(h1).fireChannelRead("2");
ch.pipeline().context(h2).fireChannelRead("3");
ch.pipeline().context(h3).fireChannelRead("4");
// Fire outbound events from all possible starting points.
ch.pipeline().write("5");
ch.pipeline().context(h3).write("6");
ch.pipeline().context(h2).write("7");
ch.pipeline().context(h1).writeAndFlush("8").sync();
ch.close().sync();
// Wait until all events are handled completely.
while (h1.outboundThreadNames.size() < 3 || h3.inboundThreadNames.size() < 3 ||
h1.removalThreadNames.size() < 1) {
if (h1.exception.get() != null) {
throw h1.exception.get();
}
if (h2.exception.get() != null) {
throw h2.exception.get();
}
if (h3.exception.get() != null) {
throw h3.exception.get();
}
Thread.sleep(10);
}
String currentName = Thread.currentThread().getName();
try {
// Events should never be handled from the current thread.
Assert.assertFalse(h1.inboundThreadNames.contains(currentName));
Assert.assertFalse(h2.inboundThreadNames.contains(currentName));
Assert.assertFalse(h3.inboundThreadNames.contains(currentName));
Assert.assertFalse(h1.outboundThreadNames.contains(currentName));
Assert.assertFalse(h2.outboundThreadNames.contains(currentName));
Assert.assertFalse(h3.outboundThreadNames.contains(currentName));
Assert.assertFalse(h1.removalThreadNames.contains(currentName));
Assert.assertFalse(h2.removalThreadNames.contains(currentName));
Assert.assertFalse(h3.removalThreadNames.contains(currentName));
// Assert that events were handled by the correct executor.
for (String name: h1.inboundThreadNames) {
Assert.assertTrue(name.startsWith("l-"));
}
for (String name: h2.inboundThreadNames) {
Assert.assertTrue(name.startsWith("e1-"));
}
for (String name: h3.inboundThreadNames) {
Assert.assertTrue(name.startsWith("e2-"));
}
for (String name: h1.outboundThreadNames) {
Assert.assertTrue(name.startsWith("l-"));
}
for (String name: h2.outboundThreadNames) {
Assert.assertTrue(name.startsWith("e1-"));
}
for (String name: h3.outboundThreadNames) {
Assert.assertTrue(name.startsWith("e2-"));
}
for (String name: h1.removalThreadNames) {
Assert.assertTrue(name.startsWith("l-"));
}
for (String name: h2.removalThreadNames) {
Assert.assertTrue(name.startsWith("e1-"));
}
for (String name: h3.removalThreadNames) {
Assert.assertTrue(name.startsWith("e2-"));
}
// Assert that the events for the same handler were handled by the same thread.
Set<String> names = new HashSet<>();
names.addAll(h1.inboundThreadNames);
names.addAll(h1.outboundThreadNames);
names.addAll(h1.removalThreadNames);
Assert.assertEquals(1, names.size());
names.clear();
names.addAll(h2.inboundThreadNames);
names.addAll(h2.outboundThreadNames);
names.addAll(h2.removalThreadNames);
Assert.assertEquals(1, names.size());
names.clear();
names.addAll(h3.inboundThreadNames);
names.addAll(h3.outboundThreadNames);
names.addAll(h3.removalThreadNames);
Assert.assertEquals(1, names.size());
// Count the number of events
Assert.assertEquals(1, h1.inboundThreadNames.size());
Assert.assertEquals(2, h2.inboundThreadNames.size());
Assert.assertEquals(3, h3.inboundThreadNames.size());
Assert.assertEquals(3, h1.outboundThreadNames.size());
Assert.assertEquals(2, h2.outboundThreadNames.size());
Assert.assertEquals(1, h3.outboundThreadNames.size());
Assert.assertEquals(1, h1.removalThreadNames.size());
Assert.assertEquals(1, h2.removalThreadNames.size());
Assert.assertEquals(1, h3.removalThreadNames.size());
} catch (AssertionError e) {
System.out.println("H1I: " + h1.inboundThreadNames);
System.out.println("H2I: " + h2.inboundThreadNames);
System.out.println("H3I: " + h3.inboundThreadNames);
System.out.println("H1O: " + h1.outboundThreadNames);
System.out.println("H2O: " + h2.outboundThreadNames);
System.out.println("H3O: " + h3.outboundThreadNames);
System.out.println("H1R: " + h1.removalThreadNames);
System.out.println("H2R: " + h2.removalThreadNames);
System.out.println("H3R: " + h3.removalThreadNames);
throw e;
} finally {
l.shutdownGracefully();
e1.shutdownGracefully();
e2.shutdownGracefully();
l.terminationFuture().sync();
e1.terminationFuture().sync();
e2.terminationFuture().sync();
}
}
@Test(timeout = 30000)
@Ignore
public void testConcurrentMessageBufferAccess() throws Throwable {
EventLoopGroup l = new MultithreadEventLoopGroup(4, new DefaultThreadFactory("l"),
LocalHandler.newFactory());
EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1"));
EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2"));
EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e3"));
EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e4"));
EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e5"));
try {
final MessageForwarder1 h1 = new MessageForwarder1();
final MessageForwarder2 h2 = new MessageForwarder2();
final MessageForwarder3 h3 = new MessageForwarder3();
final MessageForwarder1 h4 = new MessageForwarder1();
final MessageForwarder2 h5 = new MessageForwarder2();
final MessageDiscarder h6 = new MessageDiscarder();
final Channel ch = new LocalChannel(l.next());
// inbound: int -> byte[4] -> int -> int -> byte[4] -> int -> /dev/null
// outbound: int -> int -> byte[4] -> int -> int -> byte[4] -> /dev/null
ch.pipeline().addLast(h1)
.addLast(e1.next(), h2)
.addLast(e2.next(), h3)
.addLast(e3.next(), h4)
.addLast(e4.next(), h5)
.addLast(e5.next(), h6);
ch.register().sync().channel().connect(localAddr).sync();
final int ROUNDS = 1024;
final int ELEMS_PER_ROUNDS = 8192;
final int TOTAL_CNT = ROUNDS * ELEMS_PER_ROUNDS;
for (int i = 0; i < TOTAL_CNT;) {
final int start = i;
final int end = i + ELEMS_PER_ROUNDS;
i = end;
ch.eventLoop().execute(() -> {
for (int j = start; j < end; j ++) {
ch.pipeline().fireChannelRead(Integer.valueOf(j));
}
});
}
while (h1.inCnt < TOTAL_CNT || h2.inCnt < TOTAL_CNT || h3.inCnt < TOTAL_CNT ||
h4.inCnt < TOTAL_CNT || h5.inCnt < TOTAL_CNT || h6.inCnt < TOTAL_CNT) {
if (h1.exception.get() != null) {
throw h1.exception.get();
}
if (h2.exception.get() != null) {
throw h2.exception.get();
}
if (h3.exception.get() != null) {
throw h3.exception.get();
}
if (h4.exception.get() != null) {
throw h4.exception.get();
}
if (h5.exception.get() != null) {
throw h5.exception.get();
}
if (h6.exception.get() != null) {
throw h6.exception.get();
}
Thread.sleep(10);
}
for (int i = 0; i < TOTAL_CNT;) {
final int start = i;
final int end = i + ELEMS_PER_ROUNDS;
i = end;
ch.pipeline().context(h6).executor().execute(() -> {
for (int j = start; j < end; j ++) {
ch.write(Integer.valueOf(j));
}
ch.flush();
});
}
while (h1.outCnt < TOTAL_CNT || h2.outCnt < TOTAL_CNT || h3.outCnt < TOTAL_CNT ||
h4.outCnt < TOTAL_CNT || h5.outCnt < TOTAL_CNT || h6.outCnt < TOTAL_CNT) {
if (h1.exception.get() != null) {
throw h1.exception.get();
}
if (h2.exception.get() != null) {
throw h2.exception.get();
}
if (h3.exception.get() != null) {
throw h3.exception.get();
}
if (h4.exception.get() != null) {
throw h4.exception.get();
}
if (h5.exception.get() != null) {
throw h5.exception.get();
}
if (h6.exception.get() != null) {
throw h6.exception.get();
}
Thread.sleep(10);
}
ch.close().sync();
} finally {
l.shutdownGracefully();
e1.shutdownGracefully();
e2.shutdownGracefully();
e3.shutdownGracefully();
e4.shutdownGracefully();
e5.shutdownGracefully();
l.terminationFuture().sync();
e1.terminationFuture().sync();
e2.terminationFuture().sync();
e3.terminationFuture().sync();
e4.terminationFuture().sync();
e5.terminationFuture().sync();
}
}
private static class ThreadNameAuditor extends ChannelDuplexHandler {
private final AtomicReference<Throwable> exception = new AtomicReference<>();
private final Queue<String> inboundThreadNames = new ConcurrentLinkedQueue<>();
private final Queue<String> outboundThreadNames = new ConcurrentLinkedQueue<>();
private final Queue<String> removalThreadNames = new ConcurrentLinkedQueue<>();
private final boolean discard;
ThreadNameAuditor() {
this(false);
}
ThreadNameAuditor(boolean discard) {
this.discard = discard;
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
removalThreadNames.add(Thread.currentThread().getName());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
inboundThreadNames.add(Thread.currentThread().getName());
if (!discard) {
ctx.fireChannelRead(msg);
}
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
outboundThreadNames.add(Thread.currentThread().getName());
ctx.write(msg, promise);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
exception.compareAndSet(null, cause);
System.err.print('[' + Thread.currentThread().getName() + "] ");
cause.printStackTrace();
super.exceptionCaught(ctx, cause);
}
}
/**
* Converts integers into a binary stream.
*/
private static class MessageForwarder1 extends ChannelDuplexHandler {
private final AtomicReference<Throwable> exception = new AtomicReference<>();
private volatile int inCnt;
private volatile int outCnt;
private volatile Thread t;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Thread t = this.t;
if (t == null) {
this.t = Thread.currentThread();
} else {
Assert.assertSame(t, Thread.currentThread());
}
ByteBuf out = ctx.alloc().buffer(4);
int m = ((Integer) msg).intValue();
int expected = inCnt ++;
Assert.assertEquals(expected, m);
out.writeInt(m);
ctx.fireChannelRead(out);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
Assert.assertSame(t, Thread.currentThread());
// Don't let the write request go to the server-side channel - just swallow.
boolean swallow = this == ctx.pipeline().first();
ByteBuf m = (ByteBuf) msg;
int count = m.readableBytes() / 4;
for (int j = 0; j < count; j ++) {
int actual = m.readInt();
int expected = outCnt ++;
Assert.assertEquals(expected, actual);
if (!swallow) {
ctx.write(actual);
}
}
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER, promise);
m.release();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
exception.compareAndSet(null, cause);
//System.err.print("[" + Thread.currentThread().getName() + "] ");
//cause.printStackTrace();
super.exceptionCaught(ctx, cause);
}
}
/**
* Converts a binary stream into integers.
*/
private static class MessageForwarder2 extends ChannelDuplexHandler {
private final AtomicReference<Throwable> exception = new AtomicReference<>();
private volatile int inCnt;
private volatile int outCnt;
private volatile Thread t;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Thread t = this.t;
if (t == null) {
this.t = Thread.currentThread();
} else {
Assert.assertSame(t, Thread.currentThread());
}
ByteBuf m = (ByteBuf) msg;
int count = m.readableBytes() / 4;
for (int j = 0; j < count; j ++) {
int actual = m.readInt();
int expected = inCnt ++;
Assert.assertEquals(expected, actual);
ctx.fireChannelRead(actual);
}
m.release();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
Assert.assertSame(t, Thread.currentThread());
ByteBuf out = ctx.alloc().buffer(4);
int m = (Integer) msg;
int expected = outCnt ++;
Assert.assertEquals(expected, m);
out.writeInt(m);
ctx.write(out, promise);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
exception.compareAndSet(null, cause);
//System.err.print("[" + Thread.currentThread().getName() + "] ");
//cause.printStackTrace();
super.exceptionCaught(ctx, cause);
}
}
/**
* Simply forwards the received object to the next handler.
*/
private static class MessageForwarder3 extends ChannelDuplexHandler {
private final AtomicReference<Throwable> exception = new AtomicReference<>();
private volatile int inCnt;
private volatile int outCnt;
private volatile Thread t;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Thread t = this.t;
if (t == null) {
this.t = Thread.currentThread();
} else {
Assert.assertSame(t, Thread.currentThread());
}
int actual = (Integer) msg;
int expected = inCnt ++;
Assert.assertEquals(expected, actual);
ctx.fireChannelRead(msg);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
Assert.assertSame(t, Thread.currentThread());
int actual = (Integer) msg;
int expected = outCnt ++;
Assert.assertEquals(expected, actual);
ctx.write(msg, promise);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
exception.compareAndSet(null, cause);
System.err.print('[' + Thread.currentThread().getName() + "] ");
cause.printStackTrace();
super.exceptionCaught(ctx, cause);
}
}
/**
* Discards all received messages.
*/
private static class MessageDiscarder extends ChannelDuplexHandler {
private final AtomicReference<Throwable> exception = new AtomicReference<>();
private volatile int inCnt;
private volatile int outCnt;
private volatile Thread t;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Thread t = this.t;
if (t == null) {
this.t = Thread.currentThread();
} else {
Assert.assertSame(t, Thread.currentThread());
}
int actual = (Integer) msg;
int expected = inCnt ++;
Assert.assertEquals(expected, actual);
}
@Override
public void write(
ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
Assert.assertSame(t, Thread.currentThread());
int actual = (Integer) msg;
int expected = outCnt ++;
Assert.assertEquals(expected, actual);
ctx.write(msg, promise);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
exception.compareAndSet(null, cause);
//System.err.print("[" + Thread.currentThread().getName() + "] ");
//cause.printStackTrace();
super.exceptionCaught(ctx, cause);
}
}
}

View File

@ -1,326 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.local;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.EventExecutorGroup;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import java.util.Deque;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedDeque;
public class LocalTransportThreadModelTest3 {
enum EventType {
EXCEPTION_CAUGHT,
USER_EVENT,
MESSAGE_RECEIVED_LAST,
INACTIVE,
ACTIVE,
UNREGISTERED,
REGISTERED,
MESSAGE_RECEIVED,
WRITE,
READ
}
private static EventLoopGroup group;
private static LocalAddress localAddr;
@BeforeClass
public static void init() {
// Configure a test server
group = new MultithreadEventLoopGroup(LocalHandler.newFactory());
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// Discard
ReferenceCountUtil.release(msg);
}
});
}
});
localAddr = (LocalAddress) sb.bind(LocalAddress.ANY).syncUninterruptibly().channel().localAddress();
}
@AfterClass
public static void destroy() throws Exception {
group.shutdownGracefully().sync();
}
@Test(timeout = 60000)
@Ignore("regression test")
public void testConcurrentAddRemoveInboundEventsMultiple() throws Throwable {
for (int i = 0; i < 50; i ++) {
testConcurrentAddRemoveInboundEvents();
}
}
@Test(timeout = 60000)
@Ignore("regression test")
public void testConcurrentAddRemoveOutboundEventsMultiple() throws Throwable {
for (int i = 0; i < 50; i ++) {
testConcurrentAddRemoveOutboundEvents();
}
}
@Test(timeout = 30000)
@Ignore("needs a fix")
public void testConcurrentAddRemoveInboundEvents() throws Throwable {
testConcurrentAddRemove(true);
}
@Test(timeout = 30000)
@Ignore("needs a fix")
public void testConcurrentAddRemoveOutboundEvents() throws Throwable {
testConcurrentAddRemove(false);
}
private static void testConcurrentAddRemove(boolean inbound) throws Exception {
EventLoopGroup l = new MultithreadEventLoopGroup(4, new DefaultThreadFactory("l"),
LocalHandler.newFactory());
EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1"));
EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2"));
EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e3"));
EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e4"));
EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e5"));
final EventExecutorGroup[] groups = {e1, e2, e3, e4, e5};
try {
Deque<EventType> events = new ConcurrentLinkedDeque<>();
final EventForwarder h1 = new EventForwarder();
final EventForwarder h2 = new EventForwarder();
final EventForwarder h3 = new EventForwarder();
final EventForwarder h4 = new EventForwarder();
final EventForwarder h5 = new EventForwarder();
final EventRecorder h6 = new EventRecorder(events, inbound);
final Channel ch = new LocalChannel(l.next());
if (!inbound) {
ch.config().setAutoRead(false);
}
ch.pipeline().addLast(e1.next(), h1)
.addLast(e1.next(), h2)
.addLast(e1.next(), h3)
.addLast(e1.next(), h4)
.addLast(e1.next(), h5)
.addLast(e1.next(), "recorder", h6);
ch.register().sync().channel().connect(localAddr).sync();
final LinkedList<EventType> expectedEvents = events(inbound, 8192);
Throwable cause = new Throwable();
Thread pipelineModifier = new Thread(() -> {
Random random = new Random();
while (true) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
return;
}
if (!ch.isRegistered()) {
continue;
}
//EventForwardHandler forwardHandler = forwarders[random.nextInt(forwarders.length)];
ChannelHandler handler = ch.pipeline().removeFirst();
ch.pipeline().addBefore(groups[random.nextInt(groups.length)].next(), "recorder",
UUID.randomUUID().toString(), handler);
}
});
pipelineModifier.setDaemon(true);
pipelineModifier.start();
for (EventType event: expectedEvents) {
switch (event) {
case EXCEPTION_CAUGHT:
ch.pipeline().fireExceptionCaught(cause);
break;
case MESSAGE_RECEIVED:
ch.pipeline().fireChannelRead("");
break;
case MESSAGE_RECEIVED_LAST:
ch.pipeline().fireChannelReadComplete();
break;
case USER_EVENT:
ch.pipeline().fireUserEventTriggered("");
break;
case WRITE:
ch.pipeline().write("");
break;
case READ:
ch.pipeline().read();
break;
}
}
ch.close().sync();
while (events.peekLast() != EventType.UNREGISTERED) {
Thread.sleep(10);
}
expectedEvents.addFirst(EventType.ACTIVE);
expectedEvents.addFirst(EventType.REGISTERED);
expectedEvents.addLast(EventType.INACTIVE);
expectedEvents.addLast(EventType.UNREGISTERED);
for (;;) {
EventType event = events.poll();
if (event == null) {
Assert.assertTrue("Missing events:" + expectedEvents, expectedEvents.isEmpty());
break;
}
Assert.assertEquals(event, expectedEvents.poll());
}
} finally {
l.shutdownGracefully();
e1.shutdownGracefully();
e2.shutdownGracefully();
e3.shutdownGracefully();
e4.shutdownGracefully();
e5.shutdownGracefully();
l.terminationFuture().sync();
e1.terminationFuture().sync();
e2.terminationFuture().sync();
e3.terminationFuture().sync();
e4.terminationFuture().sync();
e5.terminationFuture().sync();
}
}
private static LinkedList<EventType> events(boolean inbound, int size) {
EventType[] events;
if (inbound) {
events = new EventType[] {
EventType.USER_EVENT, EventType.MESSAGE_RECEIVED, EventType.MESSAGE_RECEIVED_LAST,
EventType.EXCEPTION_CAUGHT};
} else {
events = new EventType[] {
EventType.READ, EventType.WRITE, EventType.EXCEPTION_CAUGHT };
}
Random random = new Random();
LinkedList<EventType> expectedEvents = new LinkedList<>();
for (int i = 0; i < size; i++) {
expectedEvents.add(events[random.nextInt(events.length)]);
}
return expectedEvents;
}
@ChannelHandler.Sharable
private static final class EventForwarder extends ChannelDuplexHandler { }
private static final class EventRecorder extends ChannelDuplexHandler {
private final Queue<EventType> events;
private final boolean inbound;
EventRecorder(Queue<EventType> events, boolean inbound) {
this.events = events;
this.inbound = inbound;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
events.add(EventType.EXCEPTION_CAUGHT);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (inbound) {
events.add(EventType.USER_EVENT);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (inbound) {
events.add(EventType.MESSAGE_RECEIVED_LAST);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
events.add(EventType.INACTIVE);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
events.add(EventType.ACTIVE);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
events.add(EventType.UNREGISTERED);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
events.add(EventType.REGISTERED);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (inbound) {
events.add(EventType.MESSAGE_RECEIVED);
}
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (!inbound) {
events.add(EventType.WRITE);
}
promise.setSuccess();
}
@Override
public void read(ChannelHandlerContext ctx) {
if (!inbound) {
events.add(EventType.READ);
}
}
}
}