diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java
index 0a033e55ab..e6c53a0a69 100644
--- a/transport/src/main/java/io/netty/channel/AbstractChannel.java
+++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java
@@ -55,7 +55,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private final Channel parent;
private final ChannelId id;
private final Unsafe unsafe;
- private final ChannelPipeline pipeline;
+ private final DefaultChannelPipeline pipeline;
private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null);
private final VoidChannelPromise voidPromise = new VoidChannelPromise(this, true);
private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
@@ -498,6 +498,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
doRegister();
neverRegistered = false;
registered = true;
+
+ if (firstRegistration) {
+ // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
+ // that were added before the registration was done.
+ pipeline.callHandlerAddedForAllHandlers();
+ }
+
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java
index e12f0f054e..6bb2df1969 100644
--- a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java
+++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java
@@ -20,7 +20,6 @@ import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.netty.util.ResourceLeakHint;
import io.netty.util.concurrent.EventExecutor;
-import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.StringUtil;
import java.net.SocketAddress;
@@ -36,22 +35,6 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
private final String name;
private boolean handlerRemoved;
- /**
- * This is set to {@code true} once the {@link ChannelHandler#handlerAdded(ChannelHandlerContext) method was called.
- * We need to keep track of this This will set to true once the
- * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} method was called. We need to keep track of this
- * to ensure we will never call another {@link ChannelHandler} method before handlerAdded(...) was called
- * to guard againstordering issues. {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} MUST be the first
- * method that is called for handler when it becomes a part of the {@link ChannelPipeline} in all cases. Not doing
- * so may lead to unexpected side-effects as {@link ChannelHandler} implementationsmay need to do initialization
- * steps before a {@link ChannelHandler} can be used.
- *
- * See #4705
- *
- * No need to mark volatile as this will be made visible as next/prev is volatile.
- */
- private boolean handlerAdded;
-
final ChannelHandlerInvoker invoker;
private ChannelFuture succeededFuture;
@@ -344,276 +327,78 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
}
final void invokeChannelRegistered() {
- final ChannelHandlerInvoker invoker = invoker();
- if (handlerAdded) {
- invoker.invokeChannelRegistered(this);
- } else {
- invoker.executor().execute(new OneTimeTask() {
- @Override
- public void run() {
- assert handlerAdded;
- invoker.invokeChannelRegistered(AbstractChannelHandlerContext.this);
- }
- });
- }
+ invoker().invokeChannelRegistered(this);
}
final void invokeChannelUnregistered() {
- final ChannelHandlerInvoker invoker = invoker();
- if (handlerAdded) {
- invoker.invokeChannelUnregistered(this);
- } else {
- invoker.executor().execute(new OneTimeTask() {
- @Override
- public void run() {
- assert handlerAdded;
- invoker.invokeChannelUnregistered(AbstractChannelHandlerContext.this);
- }
- });
- }
+ invoker().invokeChannelUnregistered(this);
}
final void invokeChannelActive() {
- final ChannelHandlerInvoker invoker = invoker();
- if (handlerAdded) {
- invoker.invokeChannelActive(this);
- } else {
- invoker.executor().execute(new OneTimeTask() {
- @Override
- public void run() {
- assert handlerAdded;
- invoker.invokeChannelActive(AbstractChannelHandlerContext.this);
- }
- });
- }
+ invoker().invokeChannelActive(this);
}
final void invokeChannelInactive() {
- final ChannelHandlerInvoker invoker = invoker();
- if (handlerAdded) {
- invoker.invokeChannelInactive(this);
- } else {
- invoker.executor().execute(new OneTimeTask() {
- @Override
- public void run() {
- assert handlerAdded;
- invoker.invokeChannelInactive(AbstractChannelHandlerContext.this);
- }
- });
- }
+ invoker().invokeChannelInactive(this);
}
final void invokeExceptionCaught(final Throwable cause) {
- final ChannelHandlerInvoker invoker = invoker();
- if (handlerAdded) {
- invoker.invokeExceptionCaught(this, cause);
- } else {
- invoker.executor().execute(new OneTimeTask() {
- @Override
- public void run() {
- assert handlerAdded;
- invoker.invokeExceptionCaught(AbstractChannelHandlerContext.this, cause);
- }
- });
- }
+ invoker().invokeExceptionCaught(this, cause);
}
final void invokeUserEventTriggered(final Object event) {
- final ChannelHandlerInvoker invoker = invoker();
- if (handlerAdded) {
- invoker.invokeUserEventTriggered(this, event);
- } else {
- invoker.executor().execute(new OneTimeTask() {
- @Override
- public void run() {
- assert handlerAdded;
- invoker.invokeUserEventTriggered(AbstractChannelHandlerContext.this, event);
- }
- });
- }
+ invoker().invokeUserEventTriggered(this, event);
}
final void invokeChannelRead(final Object msg) {
- final ChannelHandlerInvoker invoker = invoker();
- if (handlerAdded) {
- invoker.invokeChannelRead(this, msg);
- } else {
- invoker.executor().execute(new OneTimeTask() {
- @Override
- public void run() {
- assert handlerAdded;
- invoker.invokeChannelRead(AbstractChannelHandlerContext.this, msg);
- }
- });
- }
+ invoker().invokeChannelRead(this, msg);
}
final void invokeChannelReadComplete() {
- final ChannelHandlerInvoker invoker = invoker();
- if (handlerAdded) {
- invoker.invokeChannelReadComplete(this);
- } else {
- invoker.executor().execute(new OneTimeTask() {
- @Override
- public void run() {
- assert handlerAdded;
- invoker.invokeChannelReadComplete(AbstractChannelHandlerContext.this);
- }
- });
- }
+ invoker().invokeChannelReadComplete(this);
}
final void invokeChannelWritabilityChanged() {
- final ChannelHandlerInvoker invoker = invoker();
- if (handlerAdded) {
- invoker.invokeChannelWritabilityChanged(this);
- } else {
- invoker.executor().execute(new OneTimeTask() {
- @Override
- public void run() {
- assert handlerAdded;
- invoker.invokeChannelWritabilityChanged(AbstractChannelHandlerContext.this);
- }
- });
- }
+ invoker().invokeChannelWritabilityChanged(this);
}
final void invokeBind(final SocketAddress localAddress, final ChannelPromise promise) {
- final ChannelHandlerInvoker invoker = invoker();
- if (handlerAdded) {
- invoker.invokeBind(this, localAddress, promise);
- } else {
- invoker.executor().execute(new OneTimeTask() {
- @Override
- public void run() {
- assert handlerAdded;
- invoker.invokeBind(AbstractChannelHandlerContext.this, localAddress, promise);
- }
- });
- }
+ invoker().invokeBind(this, localAddress, promise);
}
final void invokeConnect(final SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) {
- final ChannelHandlerInvoker invoker = invoker();
- if (handlerAdded) {
- invoker.invokeConnect(this, remoteAddress, localAddress, promise);
- } else {
- invoker.executor().execute(new OneTimeTask() {
- @Override
- public void run() {
- assert handlerAdded;
- invoker.invokeConnect(AbstractChannelHandlerContext.this, remoteAddress, localAddress, promise);
- }
- });
- }
+ invoker().invokeConnect(this, remoteAddress, localAddress, promise);
}
final void invokeDisconnect(final ChannelPromise promise) {
- final ChannelHandlerInvoker invoker = invoker();
- if (handlerAdded) {
- invoker.invokeDisconnect(this, promise);
- } else {
- invoker.executor().execute(new OneTimeTask() {
- @Override
- public void run() {
- assert handlerAdded;
- invoker.invokeDisconnect(AbstractChannelHandlerContext.this, promise);
- }
- });
- }
+ invoker().invokeDisconnect(this, promise);
}
final void invokeClose(final ChannelPromise promise) {
- final ChannelHandlerInvoker invoker = invoker();
- if (handlerAdded) {
- invoker.invokeClose(this, promise);
- } else {
- invoker.executor().execute(new OneTimeTask() {
- @Override
- public void run() {
- assert handlerAdded;
- invoker.invokeClose(AbstractChannelHandlerContext.this, promise);
- }
- });
- }
+ invoker().invokeClose(this, promise);
}
final void invokeDeregister(final ChannelPromise promise) {
- final ChannelHandlerInvoker invoker = invoker();
- if (handlerAdded) {
- invoker.invokeDeregister(this, promise);
- } else {
- invoker.executor().execute(new OneTimeTask() {
- @Override
- public void run() {
- assert handlerAdded;
- invoker.invokeDeregister(AbstractChannelHandlerContext.this, promise);
- }
- });
- }
+ invoker().invokeDeregister(this, promise);
}
final void invokeRead() {
- final ChannelHandlerInvoker invoker = invoker();
- if (handlerAdded) {
- invoker.invokeRead(this);
- } else {
- invoker.executor().execute(new OneTimeTask() {
- @Override
- public void run() {
- assert handlerAdded;
- invoker.invokeRead(AbstractChannelHandlerContext.this);
- }
- });
- }
+ invoker().invokeRead(this);
}
final void invokeWrite(final Object msg, final ChannelPromise promise) {
- final ChannelHandlerInvoker invoker = invoker();
- if (handlerAdded) {
- invoker.invokeWrite(this, msg, promise);
- } else {
- invoker.executor().execute(new OneTimeTask() {
- @Override
- public void run() {
- assert handlerAdded;
- invoker.invokeWrite(AbstractChannelHandlerContext.this, msg, promise);
- }
- });
- }
+ invoker().invokeWrite(this, msg, promise);
}
final void invokeFlush() {
- final ChannelHandlerInvoker invoker = invoker();
- if (handlerAdded) {
- invoker.invokeFlush(this);
- } else {
- invoker.executor().execute(new OneTimeTask() {
- @Override
- public void run() {
- assert handlerAdded;
- invoker.invokeFlush(AbstractChannelHandlerContext.this);
- }
- });
- }
+ invoker().invokeFlush(this);
}
final void invokeWriteAndFlush(final Object msg, final ChannelPromise promise) {
final ChannelHandlerInvoker invoker = invoker();
- if (handlerAdded) {
- invoker.invokeWrite(this, msg, promise);
- invoker.invokeFlush(this);
- } else {
- invoker.executor().execute(new OneTimeTask() {
- @Override
- public void run() {
- assert handlerAdded;
- invoker.invokeWrite(AbstractChannelHandlerContext.this, msg, promise);
- invoker.invokeFlush(AbstractChannelHandlerContext.this);
- }
- });
- }
+ invoker.invokeWrite(this, msg, promise);
+ invoker.invokeFlush(this);
}
@Override
@@ -621,10 +406,6 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
return invoker == null ? channel().unsafe().invoker() : invoker;
}
- final void setHandlerAddedCalled() {
- handlerAdded = true;
- }
-
@Override
public String toHintString() {
return '\'' + name + "' will handle the message from this point.";
diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java
index 03e8b235df..5e56a97406 100644
--- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java
+++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java
@@ -38,6 +38,7 @@ import java.util.NoSuchElementException;
import java.util.WeakHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
/**
* The default {@link ChannelPipeline} implementation. It is usually created
@@ -67,6 +68,22 @@ final class DefaultChannelPipeline implements ChannelPipeline {
*/
private Map childInvokers;
+ /**
+ * This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process
+ * all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}.
+ *
+ * We only keep the head because it is expected that the list is used infrequently and its size is small.
+ * Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management
+ * complexity.
+ */
+ private PendingHandlerCallback pendingHandlerCallbackHead;
+
+ /**
+ * Set to {@code true} once the {@link AbstractChannel} is registered.Once set to {@code true} the value will never
+ * change.
+ */
+ private boolean registered;
+
DefaultChannelPipeline(AbstractChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
@@ -74,9 +91,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
this.channel = channel;
tail = new TailContext(this);
- tail.setHandlerAddedCalled();
head = new HeadContext(this);
- head.setHandlerAddedCalled();
head.next = tail;
tail.prev = head;
@@ -93,132 +108,286 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline addFirst(String name, ChannelHandler handler) {
- return addFirst((ChannelHandlerInvoker) null, name, handler);
+ return addFirst(null, null, name, handler);
}
@Override
- public synchronized ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
- name = filterName(name, handler);
- addFirst0(new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
- return this;
+ public ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
+ return addFirst(group, null, name, handler);
}
@Override
- public synchronized ChannelPipeline addFirst(ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
- name = filterName(name, handler);
- addFirst0(new DefaultChannelHandlerContext(this, invoker, name, handler));
+ public ChannelPipeline addFirst(ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
+ return addFirst(null, invoker, name, handler);
+ }
+
+ private ChannelPipeline addFirst(
+ EventExecutorGroup group, ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
+ final AbstractChannelHandlerContext newCtx;
+ final EventExecutor executor;
+ final boolean inEventLoop;
+ synchronized (this) {
+ checkMultiplicity(handler);
+
+ if (group != null) {
+ invoker = findInvoker(group);
+ }
+
+ newCtx = new DefaultChannelHandlerContext(this, invoker, filterName(name, handler), handler);
+ executor = executorSafe(invoker);
+
+ // If the executor is null it means that the channel was not registered on an eventloop yet.
+ // In this case we add the context to the pipeline and add a task that will call
+ // ChannelHandler.handlerAdded(...) once the channel is registered.
+ if (executor == null) {
+ addFirst0(newCtx);
+ callHandlerCallbackLater(newCtx, true);
+ return this;
+ }
+ inEventLoop = executor.inEventLoop();
+ if (inEventLoop) {
+ addFirst0(newCtx);
+ }
+ }
+
+ if (inEventLoop) {
+ callHandlerAdded0(newCtx);
+ } else {
+ waitForFuture(executor.submit(new OneTimeTask() {
+ @Override
+ public void run() {
+ synchronized (DefaultChannelPipeline.this) {
+ addFirst0(newCtx);
+ }
+ callHandlerAdded0(newCtx);
+ }
+ }));
+ }
return this;
}
private void addFirst0(AbstractChannelHandlerContext newCtx) {
- checkMultiplicity(newCtx);
-
AbstractChannelHandlerContext nextCtx = head.next;
newCtx.prev = head;
newCtx.next = nextCtx;
head.next = newCtx;
nextCtx.prev = newCtx;
-
- callHandlerAdded(newCtx);
}
@Override
public ChannelPipeline addLast(String name, ChannelHandler handler) {
- return addLast((ChannelHandlerInvoker) null, name, handler);
+ return addLast(null, null, name, handler);
}
@Override
- public synchronized ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
- name = filterName(name, handler);
- addLast0(new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
- return this;
+ public ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
+ return addLast(group, null, name, handler);
}
@Override
- public synchronized ChannelPipeline addLast(ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
- name = filterName(name, handler);
- addLast0(new DefaultChannelHandlerContext(this, invoker, name, handler));
+ public ChannelPipeline addLast(ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
+ return addLast(null, invoker, name, handler);
+ }
+
+ private ChannelPipeline addLast(EventExecutorGroup group, ChannelHandlerInvoker invoker,
+ String name, ChannelHandler handler) {
+ assertGroupAndInvoker(group, invoker);
+
+ final EventExecutor executor;
+ final AbstractChannelHandlerContext newCtx;
+ final boolean inEventLoop;
+ synchronized (this) {
+ checkMultiplicity(handler);
+
+ if (group != null) {
+ invoker = findInvoker(group);
+ }
+
+ newCtx = new DefaultChannelHandlerContext(this, invoker, filterName(name, handler), handler);
+ executor = executorSafe(invoker);
+
+ // If the executor is null it means that the channel was not registered on an eventloop yet.
+ // In this case we add the context to the pipeline and add a task that will call
+ // ChannelHandler.handlerAdded(...) once the channel is registered.
+ if (executor == null) {
+ addLast0(newCtx);
+ callHandlerCallbackLater(newCtx, true);
+ return this;
+ }
+ inEventLoop = executor.inEventLoop();
+ if (inEventLoop) {
+ addLast0(newCtx);
+ }
+ }
+ if (inEventLoop) {
+ callHandlerAdded0(newCtx);
+ } else {
+ waitForFuture(executor.submit(new OneTimeTask() {
+ @Override
+ public void run() {
+ synchronized (DefaultChannelPipeline.this) {
+ addLast0(newCtx);
+ }
+ callHandlerAdded0(newCtx);
+ }
+ }));
+ }
return this;
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
- checkMultiplicity(newCtx);
-
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
-
- callHandlerAdded(newCtx);
}
@Override
public ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
- return addBefore((ChannelHandlerInvoker) null, baseName, name, handler);
+ return addBefore(null, null, baseName, name, handler);
}
@Override
- public synchronized ChannelPipeline addBefore(
+ public ChannelPipeline addBefore(
EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
- AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
- name = filterName(name, handler);
- addBefore0(ctx, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
- return this;
+ return addBefore(group, null, baseName, name, handler);
}
@Override
- public synchronized ChannelPipeline addBefore(
+ public ChannelPipeline addBefore(
ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler) {
- AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
- name = filterName(name, handler);
- addBefore0(ctx, new DefaultChannelHandlerContext(this, invoker, name, handler));
+ return addBefore(null, invoker, baseName, name, handler);
+ }
+
+ private ChannelPipeline addBefore(EventExecutorGroup group,
+ ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler) {
+ assertGroupAndInvoker(group, invoker);
+
+ final EventExecutor executor;
+ final AbstractChannelHandlerContext newCtx;
+ final AbstractChannelHandlerContext ctx;
+ final boolean inEventLoop;
+ synchronized (this) {
+ checkMultiplicity(handler);
+ ctx = getContextOrDie(baseName);
+
+ if (group != null) {
+ invoker = findInvoker(group);
+ }
+
+ newCtx = new DefaultChannelHandlerContext(this, invoker, filterName(name, handler), handler);
+ executor = executorSafe(invoker);
+
+ // If the executor is null it means that the channel was not registered on an eventloop yet.
+ // In this case we add the context to the pipeline and add a task that will call
+ // ChannelHandler.handlerAdded(...) once the channel is registered.
+ if (executor == null) {
+ addBefore0(ctx, newCtx);
+ callHandlerCallbackLater(newCtx, true);
+ return this;
+ }
+
+ inEventLoop = executor.inEventLoop();
+ if (inEventLoop) {
+ addBefore0(ctx, newCtx);
+ }
+ }
+
+ if (inEventLoop) {
+ callHandlerAdded0(newCtx);
+ } else {
+ waitForFuture(executor.submit(new OneTimeTask() {
+ @Override
+ public void run() {
+ synchronized (DefaultChannelPipeline.this) {
+ addBefore0(ctx, newCtx);
+ }
+ callHandlerAdded0(newCtx);
+ }
+ }));
+ }
return this;
}
- private void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
- checkMultiplicity(newCtx);
-
+ private static void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
newCtx.prev = ctx.prev;
newCtx.next = ctx;
ctx.prev.next = newCtx;
ctx.prev = newCtx;
-
- callHandlerAdded(newCtx);
}
@Override
public ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
- return addAfter((ChannelHandlerInvoker) null, baseName, name, handler);
+ return addAfter(null, null, baseName, name, handler);
}
@Override
- public synchronized ChannelPipeline addAfter(
+ public ChannelPipeline addAfter(
EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
- AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
- name = filterName(name, handler);
- addAfter0(ctx, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
- return this;
+ return addAfter(group, null, baseName, name, handler);
}
@Override
- public synchronized ChannelPipeline addAfter(
+ public ChannelPipeline addAfter(
ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler) {
- AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
- name = filterName(name, handler);
- addAfter0(ctx, new DefaultChannelHandlerContext(this, invoker, name, handler));
+ return addAfter(null, invoker, baseName, name, handler);
+ }
+
+ private ChannelPipeline addAfter(EventExecutorGroup group,
+ ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler) {
+ assertGroupAndInvoker(group, invoker);
+
+ final EventExecutor executor;
+ final AbstractChannelHandlerContext newCtx;
+ final AbstractChannelHandlerContext ctx;
+ final boolean inEventLoop;
+
+ synchronized (this) {
+ checkMultiplicity(handler);
+ ctx = getContextOrDie(baseName);
+
+ if (group != null) {
+ invoker = findInvoker(group);
+ }
+
+ newCtx = new DefaultChannelHandlerContext(this, invoker, filterName(name, handler), handler);
+ executor = executorSafe(invoker);
+
+ // If the executor is null it means that the channel was not registered on an eventloop yet.
+ // In this case we remove the context from the pipeline and add a task that will call
+ // ChannelHandler.handlerRemoved(...) once the channel is registered.
+ if (executor == null) {
+ addAfter0(ctx, newCtx);
+ callHandlerCallbackLater(newCtx, true);
+ return this;
+ }
+ inEventLoop = executor.inEventLoop();
+ if (inEventLoop) {
+ addAfter0(ctx, newCtx);
+ }
+ }
+ if (inEventLoop) {
+ callHandlerAdded0(newCtx);
+ } else {
+ waitForFuture(executor.submit(new OneTimeTask() {
+ @Override
+ public void run() {
+ synchronized (DefaultChannelPipeline.this) {
+ addAfter0(ctx, newCtx);
+ }
+ callHandlerAdded0(newCtx);
+ }
+ }));
+ }
return this;
}
- private void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
- checkMultiplicity(newCtx);
-
+ private static void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
newCtx.prev = ctx;
newCtx.next = ctx.next;
ctx.next.prev = newCtx;
ctx.next = newCtx;
-
- callHandlerAdded(newCtx);
}
@Override
@@ -244,7 +413,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
for (int i = size - 1; i >= 0; i --) {
ChannelHandler h = handlers[i];
- addFirst(group, generateName(h), h);
+ addFirst(group, null, h);
}
return this;
@@ -289,7 +458,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
if (h == null) {
break;
}
- addLast(group, generateName(h), h);
+ addLast(group, null, h);
}
return this;
@@ -311,7 +480,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return this;
}
- // No need for synchronization because it is always executed in a synchronized(this) block.
private ChannelHandlerInvoker findInvoker(EventExecutorGroup group) {
if (group == null) {
return null;
@@ -348,21 +516,18 @@ final class DefaultChannelPipeline implements ChannelPipeline {
cache.put(handlerType, name);
}
- synchronized (this) {
- // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
- // any name conflicts. Note that we don't cache the names generated here.
- if (context0(name) != null) {
- String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
- for (int i = 1;; i ++) {
- String newName = baseName + i;
- if (context0(newName) == null) {
- name = newName;
- break;
- }
+ // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
+ // any name conflicts. Note that we don't cache the names generated here.
+ if (context0(name) != null) {
+ String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
+ for (int i = 1;; i ++) {
+ String newName = baseName + i;
+ if (context0(newName) == null) {
+ name = newName;
+ break;
}
}
}
-
return name;
}
@@ -390,40 +555,45 @@ final class DefaultChannelPipeline implements ChannelPipeline {
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
assert ctx != head && ctx != tail;
- AbstractChannelHandlerContext context;
- Future> future;
-
+ final EventExecutor executor;
+ final boolean inEventLoop;
synchronized (this) {
- if (!isExecuteLater(ctx)) {
+ executor = executorSafe(ctx.invoker);
+
+ // If the executor is null it means that the channel was not registered on an eventloop yet.
+ // In this case we remove the context from the pipeline and add a task that will call
+ // ChannelHandler.handlerRemoved(...) once the channel is registered.
+ if (executor == null) {
remove0(ctx);
+ callHandlerCallbackLater(ctx, false);
return ctx;
- } else {
- future = ctx.executor().submit(new OneTimeTask() {
- @Override
- public void run() {
- synchronized (DefaultChannelPipeline.this) {
- remove0(ctx);
- }
- }
- });
- context = ctx;
+ }
+ inEventLoop = executor.inEventLoop();
+ if (inEventLoop) {
+ remove0(ctx);
}
}
-
- // Run the following 'waiting' code outside of the above synchronized block
- // in order to avoid deadlock
-
- waitForFuture(future);
-
- return context;
+ if (inEventLoop) {
+ callHandlerRemoved0(ctx);
+ } else {
+ waitForFuture(executor.submit(new OneTimeTask() {
+ @Override
+ public void run() {
+ synchronized (DefaultChannelPipeline.this) {
+ remove0(ctx);
+ }
+ callHandlerRemoved0(ctx);
+ }
+ }));
+ }
+ return ctx;
}
- private void remove0(AbstractChannelHandlerContext ctx) {
+ private static void remove0(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
- callHandlerRemoved(ctx);
}
@Override
@@ -462,46 +632,63 @@ final class DefaultChannelPipeline implements ChannelPipeline {
private ChannelHandler replace(
final AbstractChannelHandlerContext ctx, String newName, ChannelHandler newHandler) {
-
assert ctx != head && ctx != tail;
- Future> future;
+ final AbstractChannelHandlerContext newCtx;
+ final EventExecutor executor;
+ final boolean inEventLoop;
synchronized (this) {
+ checkMultiplicity(newHandler);
+
if (newName == null) {
newName = ctx.name();
} else if (!ctx.name().equals(newName)) {
newName = filterName(newName, newHandler);
}
- final AbstractChannelHandlerContext newCtx =
- new DefaultChannelHandlerContext(this, ctx.invoker, newName, newHandler);
+ newCtx = new DefaultChannelHandlerContext(this, ctx.invoker, newName, newHandler);
+ executor = executorSafe(ctx.invoker);
- if (!isExecuteLater(newCtx)) {
+ // If the executor is null it means that the channel was not registered on an eventloop yet.
+ // In this case we replace the context in the pipeline
+ // and add a task that will call ChannelHandler.handlerAdded(...) and
+ // ChannelHandler.handlerRemoved(...) once the channel is registered.
+ if (executor == null) {
replace0(ctx, newCtx);
+ callHandlerCallbackLater(newCtx, true);
+ callHandlerCallbackLater(ctx, false);
return ctx.handler();
- } else {
- future = newCtx.executor().submit(new OneTimeTask() {
- @Override
- public void run() {
- synchronized (DefaultChannelPipeline.this) {
- replace0(ctx, newCtx);
- }
- }
- });
+ }
+ inEventLoop = executor.inEventLoop();
+ if (inEventLoop) {
+ replace0(ctx, newCtx);
}
}
-
- // Run the following 'waiting' code outside of the above synchronized block
- // in order to avoid deadlock
-
- waitForFuture(future);
-
+ if (inEventLoop) {
+ // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
+ // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those
+ // event handlers must be called after handlerAdded().
+ callHandlerAdded0(newCtx);
+ callHandlerRemoved0(ctx);
+ } else {
+ waitForFuture(executor.submit(new OneTimeTask() {
+ @Override
+ public void run() {
+ synchronized (DefaultChannelPipeline.this) {
+ replace0(ctx, newCtx);
+ }
+ // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
+ // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and
+ // those event handlers must be called after handlerAdded().
+ callHandlerAdded0(newCtx);
+ callHandlerRemoved0(ctx);
+ }
+ }));
+ }
return ctx.handler();
}
- private void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) {
- checkMultiplicity(newCtx);
-
+ private static void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = oldCtx.prev;
AbstractChannelHandlerContext next = oldCtx.next;
newCtx.prev = prev;
@@ -517,16 +704,9 @@ final class DefaultChannelPipeline implements ChannelPipeline {
// update the reference to the replacement so forward of buffered content will work correctly
oldCtx.prev = newCtx;
oldCtx.next = newCtx;
-
- // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
- // because callHandlerRemoved() will trigger inboundBufferUpdated() or flush() on newHandler and those
- // event handlers must be called after handlerAdded().
- callHandlerAdded(newCtx);
- callHandlerRemoved(oldCtx);
}
- private static void checkMultiplicity(ChannelHandlerContext ctx) {
- ChannelHandler handler = ctx.handler();
+ private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
if (!h.isSharable() && h.added) {
@@ -538,31 +718,18 @@ final class DefaultChannelPipeline implements ChannelPipeline {
}
}
- private void callHandlerAdded(final AbstractChannelHandlerContext ctx) {
- if (isExecuteLater(ctx)) {
- ctx.executor().execute(new OneTimeTask() {
- @Override
- public void run() {
- callHandlerAdded0(ctx);
- }
- });
- return;
- }
- callHandlerAdded0(ctx);
- }
-
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
- try {
- ctx.handler().handlerAdded(ctx);
- } finally {
- // handlerAdded(...) method was called.
- ctx.setHandlerAddedCalled();
- }
+ ctx.handler().handlerAdded(ctx);
} catch (Throwable t) {
boolean removed = false;
try {
- remove(ctx);
+ remove0(ctx);
+ try {
+ ctx.handler().handlerRemoved(ctx);
+ } finally {
+ ctx.setRemoved();
+ }
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
@@ -582,34 +749,20 @@ final class DefaultChannelPipeline implements ChannelPipeline {
}
}
- private void callHandlerRemoved(final AbstractChannelHandlerContext ctx) {
- if (isExecuteLater(ctx)) {
- ctx.executor().execute(new OneTimeTask() {
- @Override
- public void run() {
- callHandlerRemoved0(ctx);
- }
- });
- return;
- }
- callHandlerRemoved0(ctx);
- }
-
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
// Notify the complete removal.
try {
- ctx.handler().handlerRemoved(ctx);
- ctx.setRemoved();
+ try {
+ ctx.handler().handlerRemoved(ctx);
+ } finally {
+ ctx.setRemoved();
+ }
} catch (Throwable t) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
}
}
- private static boolean isExecuteLater(ChannelHandlerContext ctx) {
- return ctx.channel().isRegistered() && !ctx.executor().inEventLoop();
- }
-
/**
* Waits for a future to finish. If the task is interrupted, then the current thread will be interrupted.
* It is expected that the task performs any appropriate locking.
@@ -831,7 +984,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
*
* See: https://github.com/netty/netty/issues/3156
*/
- private void destroy() {
+ private synchronized void destroy() {
destroyUp(head.next, false);
}
@@ -873,6 +1026,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
if (inEventLoop || executor.inEventLoop(currentThread)) {
synchronized (this) {
remove0(ctx);
+ callHandlerRemoved0(ctx);
}
} else {
final AbstractChannelHandlerContext finalCtx = ctx;
@@ -1082,6 +1236,65 @@ final class DefaultChannelPipeline implements ChannelPipeline {
}
}
+ /**
+ * Should be called before {@link #fireChannelRegistered()} is called the first time.
+ */
+ void callHandlerAddedForAllHandlers() {
+ // This should only called from within the EventLoop.
+ assert channel.eventLoop().inEventLoop();
+
+ final PendingHandlerCallback pendingHandlerCallbackHead;
+ synchronized (this) {
+ assert !registered;
+
+ // This Channel itself was registered.
+ registered = true;
+
+ pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
+ // Null out so it can be GC'ed.
+ this.pendingHandlerCallbackHead = null;
+ }
+
+ // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
+ // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
+ // the EventLoop.
+ PendingHandlerCallback task = pendingHandlerCallbackHead;
+ while (task != null) {
+ task.execute();
+ task = task.next;
+ }
+ }
+
+ private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
+ assert !registered;
+
+ PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
+ PendingHandlerCallback pending = pendingHandlerCallbackHead;
+ if (pending == null) {
+ pendingHandlerCallbackHead = task;
+ } else {
+ // Find the tail of the linked-list.
+ while (pending.next != null) {
+ pending = pending.next;
+ }
+ pending.next = task;
+ }
+ }
+
+ private EventExecutor executorSafe(ChannelHandlerInvoker invoker) {
+ if (invoker == null) {
+ // We check for channel().isRegistered and handlerAdded because even if isRegistered() is false we
+ // can safely access the invoker() if handlerAdded is true. This is because in this case the Channel
+ // was previously registered and so we can still access the old EventLoop to dispatch things.
+ return channel.isRegistered() || registered ? channel.unsafe().invoker().executor() : null;
+ }
+ return invoker.executor();
+ }
+
+ private static void assertGroupAndInvoker(EventExecutorGroup group, ChannelHandlerInvoker invoker) {
+ assert group == null || invoker == null : "either group or invoker must be null";
+ }
+
// A special catch-all handler that handles both bytes and messages.
static final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
@@ -1223,4 +1436,79 @@ final class DefaultChannelPipeline implements ChannelPipeline {
unsafe.flush();
}
}
+
+ private abstract static class PendingHandlerCallback extends OneTimeTask {
+ final AbstractChannelHandlerContext ctx;
+ PendingHandlerCallback next;
+
+ PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
+ this.ctx = ctx;
+ }
+
+ abstract void execute();
+ }
+
+ private final class PendingHandlerAddedTask extends PendingHandlerCallback {
+
+ PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
+ super(ctx);
+ }
+
+ @Override
+ public void run() {
+ callHandlerAdded0(ctx);
+ }
+
+ @Override
+ void execute() {
+ EventExecutor executor = ctx.executor();
+ if (executor.inEventLoop()) {
+ callHandlerAdded0(ctx);
+ } else {
+ try {
+ executor.execute(this);
+ } catch (RejectedExecutionException e) {
+ if (logger.isWarnEnabled()) {
+ logger.warn(
+ "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
+ executor, ctx.name(), e);
+ }
+ remove0(ctx);
+ ctx.setRemoved();
+ }
+ }
+ }
+ }
+
+ private final class PendingHandlerRemovedTask extends PendingHandlerCallback {
+
+ PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
+ super(ctx);
+ }
+
+ @Override
+ public void run() {
+ callHandlerRemoved0(ctx);
+ }
+
+ @Override
+ void execute() {
+ EventExecutor executor = ctx.executor();
+ if (executor.inEventLoop()) {
+ callHandlerRemoved0(ctx);
+ } else {
+ try {
+ executor.execute(this);
+ } catch (RejectedExecutionException e) {
+ if (logger.isWarnEnabled()) {
+ logger.warn(
+ "Can't invoke handlerRemoved() as the EventExecutor {} rejected it," +
+ " removing handler {}.", executor, ctx.name(), e);
+ }
+ // remove0(...) was call before so just call AbstractChannelHandlerContext.setRemoved().
+ ctx.setRemoved();
+ }
+ }
+ }
+ }
}
diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java
index e54e206aa5..6380805b8a 100644
--- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java
+++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java
@@ -29,8 +29,11 @@ import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.AbstractEventExecutor;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.Promise;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Test;
@@ -40,10 +43,13 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.*;
@@ -608,6 +614,415 @@ public class DefaultChannelPipelineTest {
assertTrue(handlerLatch.await(2, TimeUnit.SECONDS));
}
+ @Test(timeout = 3000)
+ public void testAddHandlerBeforeRegisteredThenRemove() {
+ final EventLoop loop = group.next();
+
+ CheckEventExecutorHandler handler = new CheckEventExecutorHandler(loop);
+ ChannelPipeline pipeline = new LocalChannel().pipeline();
+ pipeline.addFirst(handler);
+ assertFalse(handler.addedPromise.isDone());
+ group.register(pipeline.channel());
+ handler.addedPromise.syncUninterruptibly();
+ pipeline.remove(handler);
+ handler.removedPromise.syncUninterruptibly();
+ }
+
+ @Test(timeout = 3000)
+ public void testAddHandlerBeforeRegisteredThenReplace() throws Exception {
+ final EventLoop loop = group.next();
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ CheckEventExecutorHandler handler = new CheckEventExecutorHandler(loop);
+ ChannelPipeline pipeline = new LocalChannel().pipeline();
+ pipeline.addFirst(handler);
+ assertFalse(handler.addedPromise.isDone());
+ group.register(pipeline.channel());
+ handler.addedPromise.syncUninterruptibly();
+ pipeline.replace(handler, null, new ChannelHandlerAdapter() {
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ latch.countDown();
+ }
+ });
+ handler.removedPromise.syncUninterruptibly();
+ latch.await();
+ }
+
+ @Test
+ public void testAddRemoveHandlerNotRegistered() throws Throwable {
+ final AtomicReference error = new AtomicReference();
+ ChannelHandler handler = new ErrorChannelHandler(error);
+ ChannelPipeline pipeline = new LocalChannel().pipeline();
+ pipeline.addFirst(handler);
+ pipeline.remove(handler);
+
+ Throwable cause = error.get();
+ if (cause != null) {
+ throw cause;
+ }
+ }
+
+ @Test
+ public void testAddReplaceHandlerNotRegistered() throws Throwable {
+ final AtomicReference error = new AtomicReference();
+ ChannelHandler handler = new ErrorChannelHandler(error);
+ ChannelPipeline pipeline = new LocalChannel().pipeline();
+ pipeline.addFirst(handler);
+ pipeline.replace(handler, null, new ErrorChannelHandler(error));
+
+ Throwable cause = error.get();
+ if (cause != null) {
+ throw cause;
+ }
+ }
+
+ @Test(timeout = 3000)
+ public void testHandlerAddedAndRemovedCalledInCorrectOrder() throws Throwable {
+ final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1);
+ final EventExecutorGroup group2 = new DefaultEventExecutorGroup(1);
+
+ try {
+ BlockingQueue addedQueue = new LinkedBlockingQueue();
+ BlockingQueue removedQueue = new LinkedBlockingQueue();
+
+ CheckOrderHandler handler1 = new CheckOrderHandler(addedQueue, removedQueue);
+ CheckOrderHandler handler2 = new CheckOrderHandler(addedQueue, removedQueue);
+ CheckOrderHandler handler3 = new CheckOrderHandler(addedQueue, removedQueue);
+ CheckOrderHandler handler4 = new CheckOrderHandler(addedQueue, removedQueue);
+
+ ChannelPipeline pipeline = new LocalChannel().pipeline();
+ pipeline.addLast(handler1);
+ group.register(pipeline.channel()).syncUninterruptibly();
+ pipeline.addLast(group1, handler2);
+ pipeline.addLast(group2, handler3);
+ pipeline.addLast(handler4);
+
+ assertTrue(removedQueue.isEmpty());
+ pipeline.channel().close().syncUninterruptibly();
+ assertHandler(handler1, addedQueue.take());
+ assertHandler(handler2, addedQueue.take());
+ assertHandler(handler3, addedQueue.take());
+ assertHandler(handler4, addedQueue.take());
+ assertTrue(addedQueue.isEmpty());
+
+ assertHandler(handler4, removedQueue.take());
+ assertHandler(handler3, removedQueue.take());
+ assertHandler(handler2, removedQueue.take());
+ assertHandler(handler1, removedQueue.take());
+ assertTrue(removedQueue.isEmpty());
+ } finally {
+ group1.shutdownGracefully();
+ group2.shutdownGracefully();
+ }
+ }
+
+ @Test(timeout = 3000)
+ public void testHandlerAddedExceptionFromChildHandlerIsPropegated() {
+ final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1);
+ try {
+ final Promise promise = group1.next().newPromise();
+ final AtomicBoolean handlerAdded = new AtomicBoolean();
+ final Exception exception = new RuntimeException();
+ ChannelPipeline pipeline = new LocalChannel().pipeline();
+ pipeline.addLast(new ChannelHandlerAdapter() {
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ handlerAdded.set(true);
+ throw exception;
+ }
+ });
+ pipeline.addLast(group1, new CheckExceptionHandler(exception, promise));
+ assertFalse(handlerAdded.get());
+ group.register(pipeline.channel());
+ promise.syncUninterruptibly();
+ } finally {
+ group1.shutdownGracefully();
+ }
+ }
+
+ @Test(timeout = 3000)
+ public void testHandlerRemovedExceptionFromChildHandlerIsPropegated() {
+ final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1);
+ try {
+ final Promise promise = group1.next().newPromise();
+ String handlerName = "foo";
+ final Exception exception = new RuntimeException();
+ ChannelPipeline pipeline = new LocalChannel().pipeline();
+ pipeline.addLast(handlerName, new ChannelHandlerAdapter() {
+ @Override
+ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+ throw exception;
+ }
+ });
+ pipeline.addLast(group1, new CheckExceptionHandler(exception, promise));
+ group.register(pipeline.channel()).syncUninterruptibly();
+ pipeline.remove(handlerName);
+ promise.syncUninterruptibly();
+ } finally {
+ group1.shutdownGracefully();
+ }
+ }
+
+ @Test(timeout = 3000)
+ public void testHandlerAddedThrowsAndRemovedThrowsException() {
+ final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1);
+ try {
+ final Promise promise = group1.next().newPromise();
+ final Exception exceptionAdded = new RuntimeException();
+ final Exception exceptionRemoved = new RuntimeException();
+ String handlerName = "foo";
+ ChannelPipeline pipeline = new LocalChannel().pipeline();
+ pipeline.addLast(handlerName, new ChannelHandlerAdapter() {
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ throw exceptionAdded;
+ }
+
+ @Override
+ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+ throw exceptionRemoved;
+ }
+ });
+ pipeline.addLast(group1, new CheckExceptionHandler(exceptionAdded, promise));
+ group.register(pipeline.channel()).syncUninterruptibly();
+ assertNull(pipeline.context(handlerName));
+ promise.syncUninterruptibly();
+ } finally {
+ group1.shutdownGracefully();
+ }
+ }
+
+ @Test(timeout = 3000)
+ public void testHandlerAddBlocksUntilHandlerAddedCalled() {
+ final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1);
+ try {
+ final Promise promise = group1.next().newPromise();
+ ChannelPipeline pipeline = new LocalChannel().pipeline();
+ group.register(pipeline.channel()).syncUninterruptibly();
+
+ pipeline.addLast(new ChannelHandlerAdapter() {
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ final AtomicBoolean handlerAddedCalled = new AtomicBoolean();
+ ctx.pipeline().addLast(group1, new ChannelHandlerAdapter() {
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ handlerAddedCalled.set(true);
+ }
+ });
+ if (handlerAddedCalled.get()) {
+ promise.setSuccess(null);
+ } else {
+ promise.setFailure(new AssertionError("handlerAdded(...) was not called yet"));
+ }
+ }
+ });
+
+ promise.syncUninterruptibly();
+ } finally {
+ group1.shutdownGracefully();
+ }
+ }
+
+ @Test
+ public void testAddRemoveHandlerCalledOnceRegistered() throws Throwable {
+ ChannelPipeline pipeline = new LocalChannel().pipeline();
+ CallbackCheckHandler handler = new CallbackCheckHandler();
+
+ pipeline.addFirst(handler);
+ pipeline.remove(handler);
+
+ assertFalse(handler.addedHandler.get());
+ assertFalse(handler.removedHandler.get());
+
+ group.register(pipeline.channel()).syncUninterruptibly();
+ Throwable cause = handler.error.get();
+ if (cause != null) {
+ throw cause;
+ }
+
+ assertTrue(handler.addedHandler.get());
+ assertTrue(handler.removedHandler.get());
+ }
+
+ @Test
+ public void testAddReplaceHandlerCalledOnceRegistered() throws Throwable {
+ ChannelPipeline pipeline = new LocalChannel().pipeline();
+ CallbackCheckHandler handler = new CallbackCheckHandler();
+ CallbackCheckHandler handler2 = new CallbackCheckHandler();
+
+ pipeline.addFirst(handler);
+ pipeline.replace(handler, null, handler2);
+
+ assertFalse(handler.addedHandler.get());
+ assertFalse(handler.removedHandler.get());
+ assertFalse(handler2.addedHandler.get());
+ assertFalse(handler2.removedHandler.get());
+
+ group.register(pipeline.channel()).syncUninterruptibly();
+ Throwable cause = handler.error.get();
+ if (cause != null) {
+ throw cause;
+ }
+
+ assertTrue(handler.addedHandler.get());
+ assertTrue(handler.removedHandler.get());
+
+ Throwable cause2 = handler2.error.get();
+ if (cause2 != null) {
+ throw cause2;
+ }
+
+ assertTrue(handler2.addedHandler.get());
+ assertFalse(handler2.removedHandler.get());
+ pipeline.remove(handler2);
+ assertTrue(handler2.removedHandler.get());
+ }
+
+ private static final class CallbackCheckHandler extends ChannelHandlerAdapter {
+ final AtomicBoolean addedHandler = new AtomicBoolean();
+ final AtomicBoolean removedHandler = new AtomicBoolean();
+ final AtomicReference error = new AtomicReference();
+
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ if (!addedHandler.compareAndSet(false, true)) {
+ error.set(new AssertionError("handlerAdded(...) called multiple times: " + ctx.name()));
+ } else if (removedHandler.get()) {
+ error.set(new AssertionError("handlerRemoved(...) called before handlerAdded(...): " + ctx.name()));
+ }
+ }
+
+ @Override
+ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+ if (!removedHandler.compareAndSet(false, true)) {
+ error.set(new AssertionError("handlerRemoved(...) called multiple times: " + ctx.name()));
+ } else if (!addedHandler.get()) {
+ error.set(new AssertionError("handlerRemoved(...) called before handlerAdded(...): " + ctx.name()));
+ }
+ }
+ }
+
+ private static final class CheckExceptionHandler extends ChannelInboundHandlerAdapter {
+ private final Throwable expected;
+ private final Promise promise;
+
+ CheckExceptionHandler(Throwable expected, Promise promise) {
+ this.expected = expected;
+ this.promise = promise;
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ if (cause instanceof ChannelPipelineException && cause.getCause() == expected) {
+ promise.setSuccess(null);
+ } else {
+ promise.setFailure(new AssertionError("cause not the expected instance"));
+ }
+ }
+ }
+
+ private static void assertHandler(CheckOrderHandler expected, CheckOrderHandler actual) throws Throwable {
+ assertSame(expected, actual);
+ actual.checkError();
+ }
+
+ private static final class CheckOrderHandler extends ChannelHandlerAdapter {
+ private final Queue addedQueue;
+ private final Queue removedQueue;
+ private final AtomicReference error = new AtomicReference();
+
+ CheckOrderHandler(Queue addedQueue, Queue removedQueue) {
+ this.addedQueue = addedQueue;
+ this.removedQueue = removedQueue;
+ }
+
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ addedQueue.add(this);
+ checkExecutor(ctx);
+ }
+
+ @Override
+ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+ removedQueue.add(this);
+ checkExecutor(ctx);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ error.set(cause);
+ }
+
+ void checkError() throws Throwable {
+ Throwable cause = error.get();
+ if (cause != null) {
+ throw cause;
+ }
+ }
+
+ private void checkExecutor(ChannelHandlerContext ctx) {
+ if (!ctx.executor().inEventLoop()) {
+ error.set(new AssertionError());
+ }
+ }
+ }
+
+ private static final class CheckEventExecutorHandler extends ChannelHandlerAdapter {
+ final EventExecutor executor;
+ final Promise addedPromise;
+ final Promise removedPromise;
+
+ CheckEventExecutorHandler(EventExecutor executor) {
+ this.executor = executor;
+ addedPromise = executor.newPromise();
+ removedPromise = executor.newPromise();
+ }
+
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ assertExecutor(ctx, addedPromise);
+ }
+
+ @Override
+ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+ assertExecutor(ctx, removedPromise);
+ }
+
+ private void assertExecutor(ChannelHandlerContext ctx, Promise promise) {
+ final boolean same;
+ try {
+ same = executor == ctx.executor();
+ } catch (Throwable cause) {
+ promise.setFailure(cause);
+ return;
+ }
+ if (same) {
+ promise.setSuccess(null);
+ } else {
+ promise.setFailure(new AssertionError("EventExecutor not the same"));
+ }
+ }
+ }
+ private static final class ErrorChannelHandler extends ChannelHandlerAdapter {
+ private final AtomicReference error;
+
+ ErrorChannelHandler(AtomicReference error) {
+ this.error = error;
+ }
+
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ error.set(new AssertionError());
+ }
+
+ @Override
+ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+ error.set(new AssertionError());
+ }
+ }
+
private static int next(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext next = ctx.next;
if (next == null) {