[#5104] Fix possible deadlock in DefaultChannelPipeline
Motivation: When a user has multiple EventLoops in an EventLoopGroup and calls pipeline.add* / remove* / replace from an EventLoop that belongs to another Channel it is possible to deadlock if the other EventLoop does the same. Modification: - Only ensure the actual modification takes place in a synchronized block and not wait until the handlerAdded(...) / handlerRemoved(...) method is called. This is ok as we submit the task to the executor while still holding the look and so ensure correct order of pipeline modifications. - Ensure if an AbstractChannelHandlerContext is put in the linked-list structure but the handlerAdded(...) method was not called we skip it until handlerAdded(...) was called. This is needed to ensure handlerAdded(...) is always called first. Result: Its not possible to deadlock when modify the DefaultChannelPipeline.
This commit is contained in:
parent
f91d89e426
commit
f984870ccc
@ -34,16 +34,30 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
volatile AbstractChannelHandlerContext next;
|
||||
volatile AbstractChannelHandlerContext prev;
|
||||
|
||||
/**
|
||||
* {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
|
||||
*/
|
||||
private static final int ADDED = 1;
|
||||
/**
|
||||
* {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
|
||||
*/
|
||||
private static final int REMOVED = 2;
|
||||
/**
|
||||
* Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
|
||||
* nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
|
||||
*/
|
||||
private static final int INIT = 0;
|
||||
|
||||
private final boolean inbound;
|
||||
private final boolean outbound;
|
||||
private final DefaultChannelPipeline pipeline;
|
||||
private final String name;
|
||||
private boolean handlerRemoved;
|
||||
|
||||
// Will be set to null if no child executor should be used, otherwise it will be set to the
|
||||
// child executor.
|
||||
final EventExecutor executor;
|
||||
private ChannelFuture succeededFuture;
|
||||
private int handlerState = INIT;
|
||||
|
||||
// Lazily instantiated tasks used to trigger events to a handler with different executor.
|
||||
// These needs to be volatile as otherwise an other Thread may see an half initialized instance.
|
||||
@ -114,10 +128,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
}
|
||||
|
||||
private void invokeChannelRegistered() {
|
||||
try {
|
||||
((ChannelInboundHandler) handler()).channelRegistered(this);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
if (isAdded()) {
|
||||
try {
|
||||
((ChannelInboundHandler) handler()).channelRegistered(this);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
}
|
||||
} else {
|
||||
fireChannelRegistered();
|
||||
}
|
||||
}
|
||||
|
||||
@ -139,10 +157,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
}
|
||||
|
||||
private void invokeChannelUnregistered() {
|
||||
try {
|
||||
((ChannelInboundHandler) handler()).channelUnregistered(this);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
if (isAdded()) {
|
||||
try {
|
||||
((ChannelInboundHandler) handler()).channelUnregistered(this);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
}
|
||||
} else {
|
||||
fireChannelUnregistered();
|
||||
}
|
||||
}
|
||||
|
||||
@ -164,10 +186,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
}
|
||||
|
||||
private void invokeChannelActive() {
|
||||
try {
|
||||
((ChannelInboundHandler) handler()).channelActive(this);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
if (isAdded()) {
|
||||
try {
|
||||
((ChannelInboundHandler) handler()).channelActive(this);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
}
|
||||
} else {
|
||||
fireChannelActive();
|
||||
}
|
||||
}
|
||||
|
||||
@ -189,10 +215,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
}
|
||||
|
||||
private void invokeChannelInactive() {
|
||||
try {
|
||||
((ChannelInboundHandler) handler()).channelInactive(this);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
if (isAdded()) {
|
||||
try {
|
||||
((ChannelInboundHandler) handler()).channelInactive(this);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
}
|
||||
} else {
|
||||
fireChannelInactive();
|
||||
}
|
||||
}
|
||||
|
||||
@ -226,14 +256,18 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
}
|
||||
|
||||
private void invokeExceptionCaught(final Throwable cause) {
|
||||
try {
|
||||
handler().exceptionCaught(this, cause);
|
||||
} catch (Throwable t) {
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.warn(
|
||||
"An exception was thrown by a user handler's " +
|
||||
"exceptionCaught() method while handling the following exception:", cause);
|
||||
if (isAdded()) {
|
||||
try {
|
||||
handler().exceptionCaught(this, cause);
|
||||
} catch (Throwable t) {
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.warn(
|
||||
"An exception was thrown by a user handler's " +
|
||||
"exceptionCaught() method while handling the following exception:", cause);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
fireExceptionCaught(cause);
|
||||
}
|
||||
}
|
||||
|
||||
@ -259,10 +293,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
}
|
||||
|
||||
private void invokeUserEventTriggered(Object event) {
|
||||
try {
|
||||
((ChannelInboundHandler) handler()).userEventTriggered(this, event);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
if (isAdded()) {
|
||||
try {
|
||||
((ChannelInboundHandler) handler()).userEventTriggered(this, event);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
}
|
||||
} else {
|
||||
fireUserEventTriggered(event);
|
||||
}
|
||||
}
|
||||
|
||||
@ -288,10 +326,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
}
|
||||
|
||||
private void invokeChannelRead(Object msg) {
|
||||
try {
|
||||
((ChannelInboundHandler) handler()).channelRead(this, msg);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
if (isAdded()) {
|
||||
try {
|
||||
((ChannelInboundHandler) handler()).channelRead(this, msg);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
}
|
||||
} else {
|
||||
fireChannelRead(msg);
|
||||
}
|
||||
}
|
||||
|
||||
@ -317,10 +359,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
}
|
||||
|
||||
private void invokeChannelReadComplete() {
|
||||
try {
|
||||
((ChannelInboundHandler) handler()).channelReadComplete(this);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
if (isAdded()) {
|
||||
try {
|
||||
((ChannelInboundHandler) handler()).channelReadComplete(this);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
}
|
||||
} else {
|
||||
fireChannelReadComplete();
|
||||
}
|
||||
}
|
||||
|
||||
@ -346,10 +392,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
}
|
||||
|
||||
private void invokeChannelWritabilityChanged() {
|
||||
try {
|
||||
((ChannelInboundHandler) handler()).channelWritabilityChanged(this);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
if (isAdded()) {
|
||||
try {
|
||||
((ChannelInboundHandler) handler()).channelWritabilityChanged(this);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
}
|
||||
} else {
|
||||
fireChannelWritabilityChanged();
|
||||
}
|
||||
}
|
||||
|
||||
@ -409,10 +459,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
}
|
||||
|
||||
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
|
||||
try {
|
||||
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
|
||||
} catch (Throwable t) {
|
||||
notifyOutboundHandlerException(t, promise);
|
||||
if (isAdded()) {
|
||||
try {
|
||||
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
|
||||
} catch (Throwable t) {
|
||||
notifyOutboundHandlerException(t, promise);
|
||||
}
|
||||
} else {
|
||||
bind(localAddress, promise);
|
||||
}
|
||||
}
|
||||
|
||||
@ -449,10 +503,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
}
|
||||
|
||||
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
|
||||
try {
|
||||
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
|
||||
} catch (Throwable t) {
|
||||
notifyOutboundHandlerException(t, promise);
|
||||
if (isAdded()) {
|
||||
try {
|
||||
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
|
||||
} catch (Throwable t) {
|
||||
notifyOutboundHandlerException(t, promise);
|
||||
}
|
||||
} else {
|
||||
connect(remoteAddress, localAddress, promise);
|
||||
}
|
||||
}
|
||||
|
||||
@ -489,10 +547,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
}
|
||||
|
||||
private void invokeDisconnect(ChannelPromise promise) {
|
||||
try {
|
||||
((ChannelOutboundHandler) handler()).disconnect(this, promise);
|
||||
} catch (Throwable t) {
|
||||
notifyOutboundHandlerException(t, promise);
|
||||
if (isAdded()) {
|
||||
try {
|
||||
((ChannelOutboundHandler) handler()).disconnect(this, promise);
|
||||
} catch (Throwable t) {
|
||||
notifyOutboundHandlerException(t, promise);
|
||||
}
|
||||
} else {
|
||||
disconnect(promise);
|
||||
}
|
||||
}
|
||||
|
||||
@ -520,10 +582,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
}
|
||||
|
||||
private void invokeClose(ChannelPromise promise) {
|
||||
try {
|
||||
((ChannelOutboundHandler) handler()).close(this, promise);
|
||||
} catch (Throwable t) {
|
||||
notifyOutboundHandlerException(t, promise);
|
||||
if (isAdded()) {
|
||||
try {
|
||||
((ChannelOutboundHandler) handler()).close(this, promise);
|
||||
} catch (Throwable t) {
|
||||
notifyOutboundHandlerException(t, promise);
|
||||
}
|
||||
} else {
|
||||
close(promise);
|
||||
}
|
||||
}
|
||||
|
||||
@ -551,10 +617,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
}
|
||||
|
||||
private void invokeDeregister(ChannelPromise promise) {
|
||||
try {
|
||||
((ChannelOutboundHandler) handler()).deregister(this, promise);
|
||||
} catch (Throwable t) {
|
||||
notifyOutboundHandlerException(t, promise);
|
||||
if (isAdded()) {
|
||||
try {
|
||||
((ChannelOutboundHandler) handler()).deregister(this, promise);
|
||||
} catch (Throwable t) {
|
||||
notifyOutboundHandlerException(t, promise);
|
||||
}
|
||||
} else {
|
||||
deregister(promise);
|
||||
}
|
||||
}
|
||||
|
||||
@ -581,10 +651,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
}
|
||||
|
||||
private void invokeRead() {
|
||||
try {
|
||||
((ChannelOutboundHandler) handler()).read(this);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
if (isAdded()) {
|
||||
try {
|
||||
((ChannelOutboundHandler) handler()).read(this);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
}
|
||||
} else {
|
||||
read();
|
||||
}
|
||||
}
|
||||
|
||||
@ -615,6 +689,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
}
|
||||
|
||||
private void invokeWrite(Object msg, ChannelPromise promise) {
|
||||
if (isAdded()) {
|
||||
invokeWrite0(msg, promise);
|
||||
} else {
|
||||
write(msg, promise);
|
||||
}
|
||||
}
|
||||
|
||||
private void invokeWrite0(Object msg, ChannelPromise promise) {
|
||||
try {
|
||||
((ChannelOutboundHandler) handler()).write(this, msg, promise);
|
||||
} catch (Throwable t) {
|
||||
@ -645,6 +727,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
}
|
||||
|
||||
private void invokeFlush() {
|
||||
if (isAdded()) {
|
||||
invokeFlush0();
|
||||
} else {
|
||||
flush();
|
||||
}
|
||||
}
|
||||
|
||||
private void invokeFlush0() {
|
||||
try {
|
||||
((ChannelOutboundHandler) handler()).flush(this);
|
||||
} catch (Throwable t) {
|
||||
@ -669,13 +759,23 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
return promise;
|
||||
}
|
||||
|
||||
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
|
||||
if (isAdded()) {
|
||||
invokeWrite0(msg, promise);
|
||||
invokeFlush0();
|
||||
} else {
|
||||
writeAndFlush(msg, promise);
|
||||
}
|
||||
}
|
||||
|
||||
private void write(Object msg, boolean flush, ChannelPromise promise) {
|
||||
AbstractChannelHandlerContext next = findContextOutbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeWrite(msg, promise);
|
||||
if (flush) {
|
||||
next.invokeFlush();
|
||||
next.invokeWriteAndFlush(msg, promise);
|
||||
} else {
|
||||
next.invokeWrite(msg, promise);
|
||||
}
|
||||
} else {
|
||||
AbstractWriteTask task;
|
||||
@ -816,13 +916,29 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
return channel().voidPromise();
|
||||
}
|
||||
|
||||
void setRemoved() {
|
||||
handlerRemoved = true;
|
||||
final void setRemoved() {
|
||||
handlerState = REMOVED;
|
||||
}
|
||||
|
||||
final void setAdded() {
|
||||
handlerState = ADDED;
|
||||
}
|
||||
|
||||
/**
|
||||
* Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called
|
||||
* yet. If not return {@code false} and if called or could not detect return {@code true}.
|
||||
*
|
||||
* If this method returns {@code true} we will not invoke the {@link ChannelHandler} but just forward the event.
|
||||
* This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list
|
||||
* but not called {@link }
|
||||
*/
|
||||
private boolean isAdded() {
|
||||
return handlerState == ADDED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRemoved() {
|
||||
return handlerRemoved;
|
||||
return handlerState == REMOVED;
|
||||
}
|
||||
|
||||
private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
|
||||
|
@ -21,7 +21,6 @@ import io.netty.util.concurrent.EventExecutor;
|
||||
import io.netty.util.concurrent.EventExecutorGroup;
|
||||
import io.netty.util.concurrent.FastThreadLocal;
|
||||
import io.netty.util.internal.OneTimeTask;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.StringUtil;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
@ -35,8 +34,6 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.WeakHashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
|
||||
/**
|
||||
@ -128,7 +125,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
public ChannelPipeline addFirst(EventExecutorGroup group, final String name, ChannelHandler handler) {
|
||||
final AbstractChannelHandlerContext newCtx;
|
||||
final EventExecutor executor;
|
||||
final boolean inEventLoop;
|
||||
synchronized (this) {
|
||||
checkDuplicateName(name);
|
||||
checkMultiplicity(handler);
|
||||
@ -136,33 +132,27 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
newCtx = newContext(group, name, handler);
|
||||
executor = executorSafe(newCtx.executor);
|
||||
|
||||
addFirst0(newCtx);
|
||||
|
||||
// If the executor is null it means that the channel was not registered on an eventloop yet.
|
||||
// In this case we add the context to the pipeline and add a task that will call
|
||||
// ChannelHandler.handlerAdded(...) once the channel is registered.
|
||||
if (executor == null) {
|
||||
addFirst0(newCtx);
|
||||
callHandlerCallbackLater(newCtx, true);
|
||||
return this;
|
||||
}
|
||||
inEventLoop = executor.inEventLoop();
|
||||
if (inEventLoop) {
|
||||
addFirst0(newCtx);
|
||||
|
||||
if (!executor.inEventLoop()) {
|
||||
executor.execute(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
callHandlerAdded0(newCtx);
|
||||
}
|
||||
});
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
if (inEventLoop) {
|
||||
callHandlerAdded0(newCtx);
|
||||
} else {
|
||||
waitForFuture(executor.submit(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (DefaultChannelPipeline.this) {
|
||||
addFirst0(newCtx);
|
||||
}
|
||||
callHandlerAdded0(newCtx);
|
||||
}
|
||||
}));
|
||||
}
|
||||
callHandlerAdded0(newCtx);
|
||||
return this;
|
||||
}
|
||||
|
||||
@ -183,7 +173,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
|
||||
final EventExecutor executor;
|
||||
final AbstractChannelHandlerContext newCtx;
|
||||
final boolean inEventLoop;
|
||||
synchronized (this) {
|
||||
checkDuplicateName(name);
|
||||
checkMultiplicity(handler);
|
||||
@ -191,32 +180,26 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
newCtx = newContext(group, name, handler);
|
||||
executor = executorSafe(newCtx.executor);
|
||||
|
||||
addLast0(newCtx);
|
||||
|
||||
// If the executor is null it means that the channel was not registered on an eventloop yet.
|
||||
// In this case we add the context to the pipeline and add a task that will call
|
||||
// ChannelHandler.handlerAdded(...) once the channel is registered.
|
||||
if (executor == null) {
|
||||
addLast0(newCtx);
|
||||
callHandlerCallbackLater(newCtx, true);
|
||||
return this;
|
||||
}
|
||||
inEventLoop = executor.inEventLoop();
|
||||
if (inEventLoop) {
|
||||
addLast0(newCtx);
|
||||
if (!executor.inEventLoop()) {
|
||||
executor.execute(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
callHandlerAdded0(newCtx);
|
||||
}
|
||||
});
|
||||
return this;
|
||||
}
|
||||
}
|
||||
if (inEventLoop) {
|
||||
callHandlerAdded0(newCtx);
|
||||
} else {
|
||||
waitForFuture(executor.submit(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (DefaultChannelPipeline.this) {
|
||||
addLast0(newCtx);
|
||||
}
|
||||
callHandlerAdded0(newCtx);
|
||||
}
|
||||
}));
|
||||
}
|
||||
callHandlerAdded0(newCtx);
|
||||
return this;
|
||||
}
|
||||
|
||||
@ -239,7 +222,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
final EventExecutor executor;
|
||||
final AbstractChannelHandlerContext newCtx;
|
||||
final AbstractChannelHandlerContext ctx;
|
||||
final boolean inEventLoop;
|
||||
synchronized (this) {
|
||||
checkMultiplicity(handler);
|
||||
ctx = getContextOrDie(baseName);
|
||||
@ -248,34 +230,27 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
newCtx = newContext(group, name, handler);
|
||||
executor = executorSafe(newCtx.executor);
|
||||
|
||||
addBefore0(ctx, newCtx);
|
||||
|
||||
// If the executor is null it means that the channel was not registered on an eventloop yet.
|
||||
// In this case we add the context to the pipeline and add a task that will call
|
||||
// ChannelHandler.handlerAdded(...) once the channel is registered.
|
||||
if (executor == null) {
|
||||
addBefore0(ctx, newCtx);
|
||||
callHandlerCallbackLater(newCtx, true);
|
||||
return this;
|
||||
}
|
||||
|
||||
inEventLoop = executor.inEventLoop();
|
||||
if (inEventLoop) {
|
||||
addBefore0(ctx, newCtx);
|
||||
if (!executor.inEventLoop()) {
|
||||
executor.execute(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
callHandlerAdded0(newCtx);
|
||||
}
|
||||
});
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
if (inEventLoop) {
|
||||
callHandlerAdded0(newCtx);
|
||||
} else {
|
||||
waitForFuture(executor.submit(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (DefaultChannelPipeline.this) {
|
||||
addBefore0(ctx, newCtx);
|
||||
}
|
||||
callHandlerAdded0(newCtx);
|
||||
}
|
||||
}));
|
||||
}
|
||||
callHandlerAdded0(newCtx);
|
||||
return this;
|
||||
}
|
||||
|
||||
@ -297,7 +272,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
final EventExecutor executor;
|
||||
final AbstractChannelHandlerContext newCtx;
|
||||
final AbstractChannelHandlerContext ctx;
|
||||
final boolean inEventLoop;
|
||||
|
||||
synchronized (this) {
|
||||
checkMultiplicity(handler);
|
||||
@ -307,32 +281,26 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
newCtx = newContext(group, name, handler);
|
||||
executor = executorSafe(newCtx.executor);
|
||||
|
||||
addAfter0(ctx, newCtx);
|
||||
|
||||
// If the executor is null it means that the channel was not registered on an eventloop yet.
|
||||
// In this case we remove the context from the pipeline and add a task that will call
|
||||
// ChannelHandler.handlerRemoved(...) once the channel is registered.
|
||||
if (executor == null) {
|
||||
addAfter0(ctx, newCtx);
|
||||
callHandlerCallbackLater(newCtx, true);
|
||||
return this;
|
||||
}
|
||||
inEventLoop = executor.inEventLoop();
|
||||
if (inEventLoop) {
|
||||
addAfter0(ctx, newCtx);
|
||||
if (!executor.inEventLoop()) {
|
||||
executor.execute(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
callHandlerAdded0(newCtx);
|
||||
}
|
||||
});
|
||||
return this;
|
||||
}
|
||||
}
|
||||
if (inEventLoop) {
|
||||
callHandlerAdded0(newCtx);
|
||||
} else {
|
||||
waitForFuture(executor.submit(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (DefaultChannelPipeline.this) {
|
||||
addAfter0(ctx, newCtx);
|
||||
}
|
||||
callHandlerAdded0(newCtx);
|
||||
}
|
||||
}));
|
||||
}
|
||||
callHandlerAdded0(newCtx);
|
||||
return this;
|
||||
}
|
||||
|
||||
@ -442,36 +410,30 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
assert ctx != head && ctx != tail;
|
||||
|
||||
final EventExecutor executor;
|
||||
final boolean inEventLoop;
|
||||
synchronized (this) {
|
||||
executor = executorSafe(ctx.executor);
|
||||
|
||||
remove0(ctx);
|
||||
|
||||
// If the executor is null it means that the channel was not registered on an eventloop yet.
|
||||
// In this case we remove the context from the pipeline and add a task that will call
|
||||
// ChannelHandler.handlerRemoved(...) once the channel is registered.
|
||||
if (executor == null) {
|
||||
remove0(ctx);
|
||||
callHandlerCallbackLater(ctx, false);
|
||||
return ctx;
|
||||
}
|
||||
inEventLoop = executor.inEventLoop();
|
||||
if (inEventLoop) {
|
||||
remove0(ctx);
|
||||
|
||||
if (!executor.inEventLoop()) {
|
||||
executor.execute(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
callHandlerRemoved0(ctx);
|
||||
}
|
||||
});
|
||||
return ctx;
|
||||
}
|
||||
}
|
||||
if (inEventLoop) {
|
||||
callHandlerRemoved0(ctx);
|
||||
} else {
|
||||
waitForFuture(executor.submit(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (DefaultChannelPipeline.this) {
|
||||
remove0(ctx);
|
||||
}
|
||||
callHandlerRemoved0(ctx);
|
||||
}
|
||||
}));
|
||||
}
|
||||
callHandlerRemoved0(ctx);
|
||||
return ctx;
|
||||
}
|
||||
|
||||
@ -522,7 +484,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
|
||||
final AbstractChannelHandlerContext newCtx;
|
||||
final EventExecutor executor;
|
||||
final boolean inEventLoop;
|
||||
synchronized (this) {
|
||||
checkMultiplicity(newHandler);
|
||||
boolean sameName = ctx.name().equals(newName);
|
||||
@ -533,42 +494,36 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
newCtx = newContext(ctx.executor, newName, newHandler);
|
||||
executor = executorSafe(ctx.executor);
|
||||
|
||||
replace0(ctx, newCtx);
|
||||
|
||||
// If the executor is null it means that the channel was not registered on an eventloop yet.
|
||||
// In this case we replace the context in the pipeline
|
||||
// and add a task that will call ChannelHandler.handlerAdded(...) and
|
||||
// ChannelHandler.handlerRemoved(...) once the channel is registered.
|
||||
if (executor == null) {
|
||||
replace0(ctx, newCtx);
|
||||
callHandlerCallbackLater(newCtx, true);
|
||||
callHandlerCallbackLater(ctx, false);
|
||||
return ctx.handler();
|
||||
}
|
||||
inEventLoop = executor.inEventLoop();
|
||||
if (inEventLoop) {
|
||||
replace0(ctx, newCtx);
|
||||
if (!executor.inEventLoop()) {
|
||||
executor.execute(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
// Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
|
||||
// because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and
|
||||
// those event handlers must be called after handlerAdded().
|
||||
callHandlerAdded0(newCtx);
|
||||
callHandlerRemoved0(ctx);
|
||||
}
|
||||
});
|
||||
return ctx.handler();
|
||||
}
|
||||
}
|
||||
if (inEventLoop) {
|
||||
// Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
|
||||
// because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those
|
||||
// event handlers must be called after handlerAdded().
|
||||
callHandlerAdded0(newCtx);
|
||||
callHandlerRemoved0(ctx);
|
||||
} else {
|
||||
waitForFuture(executor.submit(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (DefaultChannelPipeline.this) {
|
||||
replace0(ctx, newCtx);
|
||||
}
|
||||
// Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
|
||||
// because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and
|
||||
// those event handlers must be called after handlerAdded().
|
||||
callHandlerAdded0(newCtx);
|
||||
callHandlerRemoved0(ctx);
|
||||
}
|
||||
}));
|
||||
}
|
||||
// Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
|
||||
// because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those
|
||||
// event handlers must be called after handlerAdded().
|
||||
callHandlerAdded0(newCtx);
|
||||
callHandlerRemoved0(ctx);
|
||||
return ctx.handler();
|
||||
}
|
||||
|
||||
@ -605,6 +560,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
|
||||
try {
|
||||
ctx.handler().handlerAdded(ctx);
|
||||
ctx.setAdded();
|
||||
} catch (Throwable t) {
|
||||
boolean removed = false;
|
||||
try {
|
||||
@ -647,33 +603,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits for a future to finish. If the task is interrupted, then the current thread will be interrupted.
|
||||
* It is expected that the task performs any appropriate locking.
|
||||
* <p>
|
||||
* If the internal call throws a {@link Throwable}, but it is not an instance of {@link Error} or
|
||||
* {@link RuntimeException}, then it is wrapped inside a {@link ChannelPipelineException} and that is
|
||||
* thrown instead.</p>
|
||||
*
|
||||
* @param future wait for this future
|
||||
* @see Future#get()
|
||||
* @throws Error if the task threw this.
|
||||
* @throws RuntimeException if the task threw this.
|
||||
* @throws ChannelPipelineException with a {@link Throwable} as a cause, if the task threw another type of
|
||||
* {@link Throwable}.
|
||||
*/
|
||||
private static void waitForFuture(Future<?> future) {
|
||||
try {
|
||||
future.get();
|
||||
} catch (ExecutionException ex) {
|
||||
// In the arbitrary case, we can throw Error, RuntimeException, and Exception
|
||||
PlatformDependent.throwException(ex.getCause());
|
||||
} catch (InterruptedException ex) {
|
||||
// Interrupt the calling thread (note that this method is not called from the event loop)
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandler first() {
|
||||
ChannelHandlerContext first = firstContext();
|
||||
@ -1176,6 +1105,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
|
||||
TailContext(DefaultChannelPipeline pipeline) {
|
||||
super(pipeline, null, TAIL_NAME, true, false);
|
||||
setAdded();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -1247,6 +1177,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
HeadContext(DefaultChannelPipeline pipeline) {
|
||||
super(pipeline, null, HEAD_NAME, false, true);
|
||||
unsafe = pipeline.channel().unsafe();
|
||||
setAdded();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -34,6 +34,7 @@ import io.netty.util.concurrent.DefaultEventExecutorGroup;
|
||||
import io.netty.util.concurrent.EventExecutor;
|
||||
import io.netty.util.concurrent.EventExecutorGroup;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.ImmediateEventExecutor;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
@ -687,16 +688,19 @@ public class DefaultChannelPipelineTest {
|
||||
|
||||
assertTrue(removedQueue.isEmpty());
|
||||
pipeline.channel().close().syncUninterruptibly();
|
||||
assertHandler(handler1, addedQueue.take());
|
||||
assertHandler(handler2, addedQueue.take());
|
||||
assertHandler(handler3, addedQueue.take());
|
||||
assertHandler(handler4, addedQueue.take());
|
||||
assertHandler(addedQueue.take(), handler1);
|
||||
|
||||
// Depending on timing this can be handler2 or handler3 as these use different EventExecutorGroups.
|
||||
assertHandler(addedQueue.take(), handler2, handler3, handler4);
|
||||
assertHandler(addedQueue.take(), handler2, handler3, handler4);
|
||||
assertHandler(addedQueue.take(), handler2, handler3, handler4);
|
||||
|
||||
assertTrue(addedQueue.isEmpty());
|
||||
|
||||
assertHandler(handler4, removedQueue.take());
|
||||
assertHandler(handler3, removedQueue.take());
|
||||
assertHandler(handler2, removedQueue.take());
|
||||
assertHandler(handler1, removedQueue.take());
|
||||
assertHandler(removedQueue.take(), handler4);
|
||||
assertHandler(removedQueue.take(), handler3);
|
||||
assertHandler(removedQueue.take(), handler2);
|
||||
assertHandler(removedQueue.take(), handler1);
|
||||
assertTrue(removedQueue.isEmpty());
|
||||
} finally {
|
||||
group1.shutdownGracefully();
|
||||
@ -780,39 +784,7 @@ public class DefaultChannelPipelineTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 3000)
|
||||
public void testHandlerAddBlocksUntilHandlerAddedCalled() {
|
||||
final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1);
|
||||
try {
|
||||
final Promise<Void> promise = group1.next().newPromise();
|
||||
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||
group.register(pipeline.channel()).syncUninterruptibly();
|
||||
|
||||
pipeline.addLast(new ChannelHandlerAdapter() {
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||
final AtomicBoolean handlerAddedCalled = new AtomicBoolean();
|
||||
ctx.pipeline().addLast(group1, new ChannelHandlerAdapter() {
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||
handlerAddedCalled.set(true);
|
||||
}
|
||||
});
|
||||
if (handlerAddedCalled.get()) {
|
||||
promise.setSuccess(null);
|
||||
} else {
|
||||
promise.setFailure(new AssertionError("handlerAdded(...) was not called yet"));
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
promise.syncUninterruptibly();
|
||||
} finally {
|
||||
group1.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 2000)
|
||||
public void testAddRemoveHandlerCalledOnceRegistered() throws Throwable {
|
||||
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||
CallbackCheckHandler handler = new CallbackCheckHandler();
|
||||
@ -820,8 +792,8 @@ public class DefaultChannelPipelineTest {
|
||||
pipeline.addFirst(handler);
|
||||
pipeline.remove(handler);
|
||||
|
||||
assertFalse(handler.addedHandler.get());
|
||||
assertFalse(handler.removedHandler.get());
|
||||
assertNull(handler.addedHandler.getNow());
|
||||
assertNull(handler.removedHandler.getNow());
|
||||
|
||||
group.register(pipeline.channel()).syncUninterruptibly();
|
||||
Throwable cause = handler.error.get();
|
||||
@ -833,7 +805,7 @@ public class DefaultChannelPipelineTest {
|
||||
assertTrue(handler.removedHandler.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 3000)
|
||||
public void testAddReplaceHandlerCalledOnceRegistered() throws Throwable {
|
||||
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||
CallbackCheckHandler handler = new CallbackCheckHandler();
|
||||
@ -842,10 +814,10 @@ public class DefaultChannelPipelineTest {
|
||||
pipeline.addFirst(handler);
|
||||
pipeline.replace(handler, "newHandler", handler2);
|
||||
|
||||
assertFalse(handler.addedHandler.get());
|
||||
assertFalse(handler.removedHandler.get());
|
||||
assertFalse(handler2.addedHandler.get());
|
||||
assertFalse(handler2.removedHandler.get());
|
||||
assertNull(handler.addedHandler.getNow());
|
||||
assertNull(handler.removedHandler.getNow());
|
||||
assertNull(handler2.addedHandler.getNow());
|
||||
assertNull(handler2.removedHandler.getNow());
|
||||
|
||||
group.register(pipeline.channel()).syncUninterruptibly();
|
||||
Throwable cause = handler.error.get();
|
||||
@ -862,30 +834,71 @@ public class DefaultChannelPipelineTest {
|
||||
}
|
||||
|
||||
assertTrue(handler2.addedHandler.get());
|
||||
assertFalse(handler2.removedHandler.get());
|
||||
assertNull(handler2.removedHandler.getNow());
|
||||
pipeline.remove(handler2);
|
||||
assertTrue(handler2.removedHandler.get());
|
||||
}
|
||||
|
||||
@Test(timeout = 3000)
|
||||
public void testAddBefore() throws Throwable {
|
||||
ChannelPipeline pipeline1 = new LocalChannel().pipeline();
|
||||
ChannelPipeline pipeline2 = new LocalChannel().pipeline();
|
||||
|
||||
EventLoopGroup defaultGroup = new LocalEventLoopGroup(2);
|
||||
try {
|
||||
EventLoop eventLoop1 = defaultGroup.next();
|
||||
EventLoop eventLoop2 = defaultGroup.next();
|
||||
|
||||
eventLoop1.register(pipeline1.channel()).syncUninterruptibly();
|
||||
eventLoop2.register(pipeline2.channel()).syncUninterruptibly();
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(2 * 10);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
eventLoop1.execute(new TestTask(pipeline2, latch));
|
||||
eventLoop2.execute(new TestTask(pipeline1, latch));
|
||||
}
|
||||
latch.await();
|
||||
} finally {
|
||||
defaultGroup.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
|
||||
private static final class TestTask implements Runnable {
|
||||
|
||||
private final ChannelPipeline pipeline;
|
||||
private final CountDownLatch latch;
|
||||
|
||||
TestTask(ChannelPipeline pipeline, CountDownLatch latch) {
|
||||
this.pipeline = pipeline;
|
||||
this.latch = latch;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
pipeline.addLast(new ChannelInboundHandlerAdapter());
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
private static final class CallbackCheckHandler extends ChannelHandlerAdapter {
|
||||
final AtomicBoolean addedHandler = new AtomicBoolean();
|
||||
final AtomicBoolean removedHandler = new AtomicBoolean();
|
||||
final Promise<Boolean> addedHandler = ImmediateEventExecutor.INSTANCE.newPromise();
|
||||
final Promise<Boolean> removedHandler = ImmediateEventExecutor.INSTANCE.newPromise();
|
||||
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
|
||||
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||
if (!addedHandler.compareAndSet(false, true)) {
|
||||
if (!addedHandler.trySuccess(true)) {
|
||||
error.set(new AssertionError("handlerAdded(...) called multiple times: " + ctx.name()));
|
||||
} else if (removedHandler.get()) {
|
||||
} else if (removedHandler.getNow() == Boolean.TRUE) {
|
||||
error.set(new AssertionError("handlerRemoved(...) called before handlerAdded(...): " + ctx.name()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
||||
if (!removedHandler.compareAndSet(false, true)) {
|
||||
if (!removedHandler.trySuccess(true)) {
|
||||
error.set(new AssertionError("handlerRemoved(...) called multiple times: " + ctx.name()));
|
||||
} else if (!addedHandler.get()) {
|
||||
} else if (addedHandler.getNow() == Boolean.FALSE) {
|
||||
error.set(new AssertionError("handlerRemoved(...) called before handlerAdded(...): " + ctx.name()));
|
||||
}
|
||||
}
|
||||
@ -910,9 +923,14 @@ public class DefaultChannelPipelineTest {
|
||||
}
|
||||
}
|
||||
|
||||
private static void assertHandler(CheckOrderHandler expected, CheckOrderHandler actual) throws Throwable {
|
||||
assertSame(expected, actual);
|
||||
actual.checkError();
|
||||
private static void assertHandler(CheckOrderHandler actual, CheckOrderHandler... handlers) throws Throwable {
|
||||
for (CheckOrderHandler h : handlers) {
|
||||
if (h == actual) {
|
||||
actual.checkError();
|
||||
return;
|
||||
}
|
||||
}
|
||||
fail("handler was not one of the expected handlers");
|
||||
}
|
||||
|
||||
private static final class CheckOrderHandler extends ChannelHandlerAdapter {
|
||||
|
Loading…
Reference in New Issue
Block a user