[#5104] Fix possible deadlock in DefaultChannelPipeline

Motivation:

When a user has multiple EventLoops in an EventLoopGroup and calls pipeline.add* / remove* / replace from an EventLoop that belongs to another Channel it is possible to deadlock if the other EventLoop does the same.

Modification:

- Only ensure the actual modification takes place in a synchronized block and not wait until the handlerAdded(...) / handlerRemoved(...) method is called. This is ok as we submit the task to the executor while still holding the look and so ensure correct order of pipeline modifications.
- Ensure if an AbstractChannelHandlerContext is put in the linked-list structure but the handlerAdded(...) method was not called we skip it until handlerAdded(...) was called. This is needed to ensure handlerAdded(...) is always called first.

Result:

Its not possible to deadlock when modify the DefaultChannelPipeline.
This commit is contained in:
Norman Maurer 2016-04-09 21:38:45 +02:00
parent 3a9f472161
commit 8fe3c83e4c
3 changed files with 340 additions and 275 deletions

View File

@ -39,16 +39,30 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
/**
* {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
*/
private static final int ADDED = 1;
/**
* {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
*/
private static final int REMOVED = 2;
/**
* Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
* nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
*/
private static final int INIT = 0;
private final boolean inbound;
private final boolean outbound;
private final DefaultChannelPipeline pipeline;
private final String name;
private boolean handlerRemoved;
// Will be set to null if no child executor should be used, otherwise it will be set to the
// child executor.
final EventExecutor executor;
private ChannelFuture succeededFuture;
private int handlerState = INIT;
// Lazily instantiated tasks used to trigger events to a handler with different executor.
// These needs to be volatile as otherwise an other Thread may see an half initialized instance.
@ -119,10 +133,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
}
private void invokeChannelRegistered() {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
if (isAdded()) {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
@ -144,10 +162,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
}
private void invokeChannelUnregistered() {
try {
((ChannelInboundHandler) handler()).channelUnregistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
if (isAdded()) {
try {
((ChannelInboundHandler) handler()).channelUnregistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelUnregistered();
}
}
@ -169,10 +191,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
}
private void invokeChannelActive() {
try {
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
if (isAdded()) {
try {
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelActive();
}
}
@ -194,10 +220,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
}
private void invokeChannelInactive() {
try {
((ChannelInboundHandler) handler()).channelInactive(this);
} catch (Throwable t) {
notifyHandlerException(t);
if (isAdded()) {
try {
((ChannelInboundHandler) handler()).channelInactive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelInactive();
}
}
@ -231,14 +261,18 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
}
private void invokeExceptionCaught(final Throwable cause) {
try {
handler().exceptionCaught(this, cause);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn(
"An exception was thrown by a user handler's " +
"exceptionCaught() method while handling the following exception:", cause);
if (isAdded()) {
try {
handler().exceptionCaught(this, cause);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn(
"An exception was thrown by a user handler's " +
"exceptionCaught() method while handling the following exception:", cause);
}
}
} else {
fireExceptionCaught(cause);
}
}
@ -264,10 +298,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
}
private void invokeUserEventTriggered(Object event) {
try {
((ChannelInboundHandler) handler()).userEventTriggered(this, event);
} catch (Throwable t) {
notifyHandlerException(t);
if (isAdded()) {
try {
((ChannelInboundHandler) handler()).userEventTriggered(this, event);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireUserEventTriggered(event);
}
}
@ -294,10 +332,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
}
private void invokeChannelRead(Object msg) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
if (isAdded()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
@ -323,10 +365,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
}
private void invokeChannelReadComplete() {
try {
((ChannelInboundHandler) handler()).channelReadComplete(this);
} catch (Throwable t) {
notifyHandlerException(t);
if (isAdded()) {
try {
((ChannelInboundHandler) handler()).channelReadComplete(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelReadComplete();
}
}
@ -352,10 +398,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
}
private void invokeChannelWritabilityChanged() {
try {
((ChannelInboundHandler) handler()).channelWritabilityChanged(this);
} catch (Throwable t) {
notifyHandlerException(t);
if (isAdded()) {
try {
((ChannelInboundHandler) handler()).channelWritabilityChanged(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelWritabilityChanged();
}
}
@ -415,10 +465,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
}
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
if (isAdded()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}
@ -455,10 +509,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
}
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
if (isAdded()) {
try {
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
connect(remoteAddress, localAddress, promise);
}
}
@ -495,10 +553,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
}
private void invokeDisconnect(ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).disconnect(this, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
if (isAdded()) {
try {
((ChannelOutboundHandler) handler()).disconnect(this, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
disconnect(promise);
}
}
@ -526,10 +588,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
}
private void invokeClose(ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).close(this, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
if (isAdded()) {
try {
((ChannelOutboundHandler) handler()).close(this, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
close(promise);
}
}
@ -557,10 +623,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
}
private void invokeDeregister(ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).deregister(this, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
if (isAdded()) {
try {
((ChannelOutboundHandler) handler()).deregister(this, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
deregister(promise);
}
}
@ -587,10 +657,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
}
private void invokeRead() {
try {
((ChannelOutboundHandler) handler()).read(this);
} catch (Throwable t) {
notifyHandlerException(t);
if (isAdded()) {
try {
((ChannelOutboundHandler) handler()).read(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
read();
}
}
@ -621,6 +695,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
}
private void invokeWrite(Object msg, ChannelPromise promise) {
if (isAdded()) {
invokeWrite0(msg, promise);
} else {
write(msg, promise);
}
}
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
@ -651,6 +733,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
}
private void invokeFlush() {
if (isAdded()) {
invokeFlush0();
} else {
flush();
}
}
private void invokeFlush0() {
try {
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
@ -675,14 +765,24 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
return promise;
}
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (isAdded()) {
invokeWrite0(msg, promise);
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeWrite(m, promise);
if (flush) {
next.invokeFlush();
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
@ -823,13 +923,29 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
return channel().voidPromise();
}
void setRemoved() {
handlerRemoved = true;
final void setRemoved() {
handlerState = REMOVED;
}
final void setAdded() {
handlerState = ADDED;
}
/**
* Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called
* yet. If not return {@code false} and if called or could not detect return {@code true}.
*
* If this method returns {@code true} we will not invoke the {@link ChannelHandler} but just forward the event.
* This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list
* but not called {@link }
*/
private boolean isAdded() {
return handlerState == ADDED;
}
@Override
public boolean isRemoved() {
return handlerRemoved;
return handlerState == REMOVED;
}
@Override

View File

@ -22,7 +22,6 @@ import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -36,8 +35,6 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.WeakHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
/**
@ -139,7 +136,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
public ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
final EventExecutor executor;
final boolean inEventLoop;
synchronized (this) {
if (name == null) {
name = generateName(handler);
@ -151,33 +147,27 @@ final class DefaultChannelPipeline implements ChannelPipeline {
newCtx = newContext(group, name, handler);
executor = executorSafe(newCtx.executor);
addFirst0(newCtx);
// If the executor is null it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (executor == null) {
addFirst0(newCtx);
callHandlerCallbackLater(newCtx, true);
return this;
}
inEventLoop = executor.inEventLoop();
if (inEventLoop) {
addFirst0(newCtx);
if (!executor.inEventLoop()) {
executor.execute(new OneTimeTask() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
if (inEventLoop) {
callHandlerAdded0(newCtx);
} else {
waitForFuture(executor.submit(new OneTimeTask() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
addFirst0(newCtx);
}
callHandlerAdded0(newCtx);
}
}));
}
callHandlerAdded0(newCtx);
return this;
}
@ -198,7 +188,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
public ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final EventExecutor executor;
final AbstractChannelHandlerContext newCtx;
final boolean inEventLoop;
synchronized (this) {
if (name == null) {
name = generateName(handler);
@ -210,32 +199,26 @@ final class DefaultChannelPipeline implements ChannelPipeline {
newCtx = newContext(group, name, handler);
executor = executorSafe(newCtx.executor);
addLast0(newCtx);
// If the executor is null it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (executor == null) {
addLast0(newCtx);
callHandlerCallbackLater(newCtx, true);
return this;
}
inEventLoop = executor.inEventLoop();
if (inEventLoop) {
addLast0(newCtx);
if (!executor.inEventLoop()) {
executor.execute(new OneTimeTask() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
if (inEventLoop) {
callHandlerAdded0(newCtx);
} else {
waitForFuture(executor.submit(new OneTimeTask() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
addLast0(newCtx);
}
callHandlerAdded0(newCtx);
}
}));
}
callHandlerAdded0(newCtx);
return this;
}
@ -258,7 +241,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
final EventExecutor executor;
final AbstractChannelHandlerContext newCtx;
final AbstractChannelHandlerContext ctx;
final boolean inEventLoop;
synchronized (this) {
checkMultiplicity(handler);
ctx = getContextOrDie(baseName);
@ -271,34 +253,27 @@ final class DefaultChannelPipeline implements ChannelPipeline {
newCtx = newContext(group, name, handler);
executor = executorSafe(newCtx.executor);
addBefore0(ctx, newCtx);
// If the executor is null it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (executor == null) {
addBefore0(ctx, newCtx);
callHandlerCallbackLater(newCtx, true);
return this;
}
inEventLoop = executor.inEventLoop();
if (inEventLoop) {
addBefore0(ctx, newCtx);
if (!executor.inEventLoop()) {
executor.execute(new OneTimeTask() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
if (inEventLoop) {
callHandlerAdded0(newCtx);
} else {
waitForFuture(executor.submit(new OneTimeTask() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
addBefore0(ctx, newCtx);
}
callHandlerAdded0(newCtx);
}
}));
}
callHandlerAdded0(newCtx);
return this;
}
@ -320,7 +295,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
final EventExecutor executor;
final AbstractChannelHandlerContext newCtx;
final AbstractChannelHandlerContext ctx;
final boolean inEventLoop;
synchronized (this) {
checkMultiplicity(handler);
@ -330,32 +304,26 @@ final class DefaultChannelPipeline implements ChannelPipeline {
newCtx = newContext(group, name, handler);
executor = executorSafe(newCtx.executor);
addAfter0(ctx, newCtx);
// If the executor is null it means that the channel was not registered on an eventloop yet.
// In this case we remove the context from the pipeline and add a task that will call
// ChannelHandler.handlerRemoved(...) once the channel is registered.
if (executor == null) {
addAfter0(ctx, newCtx);
callHandlerCallbackLater(newCtx, true);
return this;
}
inEventLoop = executor.inEventLoop();
if (inEventLoop) {
addAfter0(ctx, newCtx);
if (!executor.inEventLoop()) {
executor.execute(new OneTimeTask() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
if (inEventLoop) {
callHandlerAdded0(newCtx);
} else {
waitForFuture(executor.submit(new OneTimeTask() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
addAfter0(ctx, newCtx);
}
callHandlerAdded0(newCtx);
}
}));
}
callHandlerAdded0(newCtx);
return this;
}
@ -465,36 +433,30 @@ final class DefaultChannelPipeline implements ChannelPipeline {
assert ctx != head && ctx != tail;
final EventExecutor executor;
final boolean inEventLoop;
synchronized (this) {
executor = executorSafe(ctx.executor);
remove0(ctx);
// If the executor is null it means that the channel was not registered on an eventloop yet.
// In this case we remove the context from the pipeline and add a task that will call
// ChannelHandler.handlerRemoved(...) once the channel is registered.
if (executor == null) {
remove0(ctx);
callHandlerCallbackLater(ctx, false);
return ctx;
}
inEventLoop = executor.inEventLoop();
if (inEventLoop) {
remove0(ctx);
if (!executor.inEventLoop()) {
executor.execute(new OneTimeTask() {
@Override
public void run() {
callHandlerRemoved0(ctx);
}
});
return ctx;
}
}
if (inEventLoop) {
callHandlerRemoved0(ctx);
} else {
waitForFuture(executor.submit(new OneTimeTask() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
remove0(ctx);
}
callHandlerRemoved0(ctx);
}
}));
}
callHandlerRemoved0(ctx);
return ctx;
}
@ -545,7 +507,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
final AbstractChannelHandlerContext newCtx;
final EventExecutor executor;
final boolean inEventLoop;
synchronized (this) {
checkMultiplicity(newHandler);
if (newName == null) {
@ -560,42 +521,36 @@ final class DefaultChannelPipeline implements ChannelPipeline {
newCtx = newContext(ctx.executor, newName, newHandler);
executor = executorSafe(ctx.executor);
replace0(ctx, newCtx);
// If the executor is null it means that the channel was not registered on an eventloop yet.
// In this case we replace the context in the pipeline
// and add a task that will call ChannelHandler.handlerAdded(...) and
// ChannelHandler.handlerRemoved(...) once the channel is registered.
if (executor == null) {
replace0(ctx, newCtx);
callHandlerCallbackLater(newCtx, true);
callHandlerCallbackLater(ctx, false);
return ctx.handler();
}
inEventLoop = executor.inEventLoop();
if (inEventLoop) {
replace0(ctx, newCtx);
if (!executor.inEventLoop()) {
executor.execute(new OneTimeTask() {
@Override
public void run() {
// Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
// because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and
// those event handlers must be called after handlerAdded().
callHandlerAdded0(newCtx);
callHandlerRemoved0(ctx);
}
});
return ctx.handler();
}
}
if (inEventLoop) {
// Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
// because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those
// event handlers must be called after handlerAdded().
callHandlerAdded0(newCtx);
callHandlerRemoved0(ctx);
} else {
waitForFuture(executor.submit(new OneTimeTask() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
replace0(ctx, newCtx);
}
// Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
// because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and
// those event handlers must be called after handlerAdded().
callHandlerAdded0(newCtx);
callHandlerRemoved0(ctx);
}
}));
}
// Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
// because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those
// event handlers must be called after handlerAdded().
callHandlerAdded0(newCtx);
callHandlerRemoved0(ctx);
return ctx.handler();
}
@ -632,6 +587,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.handler().handlerAdded(ctx);
ctx.setAdded();
} catch (Throwable t) {
boolean removed = false;
try {
@ -674,33 +630,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
}
}
/**
* Waits for a future to finish. If the task is interrupted, then the current thread will be interrupted.
* It is expected that the task performs any appropriate locking.
* <p>
* If the internal call throws a {@link Throwable}, but it is not an instance of {@link Error} or
* {@link RuntimeException}, then it is wrapped inside a {@link ChannelPipelineException} and that is
* thrown instead.</p>
*
* @param future wait for this future
* @see Future#get()
* @throws Error if the task threw this.
* @throws RuntimeException if the task threw this.
* @throws ChannelPipelineException with a {@link Throwable} as a cause, if the task threw another type of
* {@link Throwable}.
*/
private static void waitForFuture(Future<?> future) {
try {
future.get();
} catch (ExecutionException ex) {
// In the arbitrary case, we can throw Error, RuntimeException, and Exception
PlatformDependent.throwException(ex.getCause());
} catch (InterruptedException ex) {
// Interrupt the calling thread (note that this method is not called from the event loop)
Thread.currentThread().interrupt();
}
}
@Override
public ChannelHandler first() {
ChannelHandlerContext first = firstContext();
@ -1228,6 +1157,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false);
setAdded();
}
@Override
@ -1299,6 +1229,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAdded();
}
@Override

View File

@ -33,6 +33,7 @@ import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import org.junit.After;
import org.junit.AfterClass;
@ -700,16 +701,19 @@ public class DefaultChannelPipelineTest {
assertTrue(removedQueue.isEmpty());
pipeline.channel().close().syncUninterruptibly();
assertHandler(handler1, addedQueue.take());
assertHandler(handler2, addedQueue.take());
assertHandler(handler3, addedQueue.take());
assertHandler(handler4, addedQueue.take());
assertHandler(addedQueue.take(), handler1);
// Depending on timing this can be handler2 or handler3 as these use different EventExecutorGroups.
assertHandler(addedQueue.take(), handler2, handler3, handler4);
assertHandler(addedQueue.take(), handler2, handler3, handler4);
assertHandler(addedQueue.take(), handler2, handler3, handler4);
assertTrue(addedQueue.isEmpty());
assertHandler(handler4, removedQueue.take());
assertHandler(handler3, removedQueue.take());
assertHandler(handler2, removedQueue.take());
assertHandler(handler1, removedQueue.take());
assertHandler(removedQueue.take(), handler4);
assertHandler(removedQueue.take(), handler3);
assertHandler(removedQueue.take(), handler2);
assertHandler(removedQueue.take(), handler1);
assertTrue(removedQueue.isEmpty());
} finally {
group1.shutdownGracefully();
@ -793,39 +797,7 @@ public class DefaultChannelPipelineTest {
}
}
@Test(timeout = 3000)
public void testHandlerAddBlocksUntilHandlerAddedCalled() {
final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1);
try {
final Promise<Void> promise = group1.next().newPromise();
ChannelPipeline pipeline = new LocalChannel().pipeline();
group.register(pipeline.channel()).syncUninterruptibly();
pipeline.addLast(new ChannelHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
final AtomicBoolean handlerAddedCalled = new AtomicBoolean();
ctx.pipeline().addLast(group1, new ChannelHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
handlerAddedCalled.set(true);
}
});
if (handlerAddedCalled.get()) {
promise.setSuccess(null);
} else {
promise.setFailure(new AssertionError("handlerAdded(...) was not called yet"));
}
}
});
promise.syncUninterruptibly();
} finally {
group1.shutdownGracefully();
}
}
@Test
@Test(timeout = 2000)
public void testAddRemoveHandlerCalledOnceRegistered() throws Throwable {
ChannelPipeline pipeline = new LocalChannel().pipeline();
CallbackCheckHandler handler = new CallbackCheckHandler();
@ -833,8 +805,8 @@ public class DefaultChannelPipelineTest {
pipeline.addFirst(handler);
pipeline.remove(handler);
assertFalse(handler.addedHandler.get());
assertFalse(handler.removedHandler.get());
assertNull(handler.addedHandler.getNow());
assertNull(handler.removedHandler.getNow());
group.register(pipeline.channel()).syncUninterruptibly();
Throwable cause = handler.error.get();
@ -846,7 +818,7 @@ public class DefaultChannelPipelineTest {
assertTrue(handler.removedHandler.get());
}
@Test
@Test(timeout = 3000)
public void testAddReplaceHandlerCalledOnceRegistered() throws Throwable {
ChannelPipeline pipeline = new LocalChannel().pipeline();
CallbackCheckHandler handler = new CallbackCheckHandler();
@ -855,10 +827,10 @@ public class DefaultChannelPipelineTest {
pipeline.addFirst(handler);
pipeline.replace(handler, null, handler2);
assertFalse(handler.addedHandler.get());
assertFalse(handler.removedHandler.get());
assertFalse(handler2.addedHandler.get());
assertFalse(handler2.removedHandler.get());
assertNull(handler.addedHandler.getNow());
assertNull(handler.removedHandler.getNow());
assertNull(handler2.addedHandler.getNow());
assertNull(handler2.removedHandler.getNow());
group.register(pipeline.channel()).syncUninterruptibly();
Throwable cause = handler.error.get();
@ -875,30 +847,71 @@ public class DefaultChannelPipelineTest {
}
assertTrue(handler2.addedHandler.get());
assertFalse(handler2.removedHandler.get());
assertNull(handler2.removedHandler.getNow());
pipeline.remove(handler2);
assertTrue(handler2.removedHandler.get());
}
@Test(timeout = 3000)
public void testAddBefore() throws Throwable {
ChannelPipeline pipeline1 = new LocalChannel().pipeline();
ChannelPipeline pipeline2 = new LocalChannel().pipeline();
EventLoopGroup defaultGroup = new DefaultEventLoopGroup(2);
try {
EventLoop eventLoop1 = defaultGroup.next();
EventLoop eventLoop2 = defaultGroup.next();
eventLoop1.register(pipeline1.channel()).syncUninterruptibly();
eventLoop2.register(pipeline2.channel()).syncUninterruptibly();
CountDownLatch latch = new CountDownLatch(2 * 10);
for (int i = 0; i < 10; i++) {
eventLoop1.execute(new TestTask(pipeline2, latch));
eventLoop2.execute(new TestTask(pipeline1, latch));
}
latch.await();
} finally {
defaultGroup.shutdownGracefully();
}
}
private static final class TestTask implements Runnable {
private final ChannelPipeline pipeline;
private final CountDownLatch latch;
TestTask(ChannelPipeline pipeline, CountDownLatch latch) {
this.pipeline = pipeline;
this.latch = latch;
}
@Override
public void run() {
pipeline.addLast(new ChannelInboundHandlerAdapter());
latch.countDown();
}
}
private static final class CallbackCheckHandler extends ChannelHandlerAdapter {
final AtomicBoolean addedHandler = new AtomicBoolean();
final AtomicBoolean removedHandler = new AtomicBoolean();
final Promise<Boolean> addedHandler = ImmediateEventExecutor.INSTANCE.newPromise();
final Promise<Boolean> removedHandler = ImmediateEventExecutor.INSTANCE.newPromise();
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (!addedHandler.compareAndSet(false, true)) {
if (!addedHandler.trySuccess(true)) {
error.set(new AssertionError("handlerAdded(...) called multiple times: " + ctx.name()));
} else if (removedHandler.get()) {
} else if (removedHandler.getNow() == Boolean.TRUE) {
error.set(new AssertionError("handlerRemoved(...) called before handlerAdded(...): " + ctx.name()));
}
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
if (!removedHandler.compareAndSet(false, true)) {
if (!removedHandler.trySuccess(true)) {
error.set(new AssertionError("handlerRemoved(...) called multiple times: " + ctx.name()));
} else if (!addedHandler.get()) {
} else if (addedHandler.getNow() == Boolean.FALSE) {
error.set(new AssertionError("handlerRemoved(...) called before handlerAdded(...): " + ctx.name()));
}
}
@ -923,9 +936,14 @@ public class DefaultChannelPipelineTest {
}
}
private static void assertHandler(CheckOrderHandler expected, CheckOrderHandler actual) throws Throwable {
assertSame(expected, actual);
actual.checkError();
private static void assertHandler(CheckOrderHandler actual, CheckOrderHandler... handlers) throws Throwable {
for (CheckOrderHandler h : handlers) {
if (h == actual) {
actual.checkError();
return;
}
}
fail("handler was not one of the expected handlers");
}
private static final class CheckOrderHandler extends ChannelHandlerAdapter {