Ensure handlerAdded(...) and handlerRemoved(...) is always called from the right thread
Motiviation: We should ensure that handlerAdded(...) and handlerRemoved(...) is always called from the EventExecutor that also invokes the other methods of the ChannelHandler. Also we need to ensure we always call handlerAdded(...) before any other method can be calld to ensure correct ordering. Motifications: - Ensure that the right thread is used to call the methods - Ensure correct ordering - Add tests Result: Respect the thread-model for handlerAdded(...) and handlerRemoved(...) and preserve correct ordering in all cases.
This commit is contained in:
parent
eb0724d7f5
commit
b6882a5d52
@ -56,7 +56,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
private final Channel parent;
|
||||
private final long hashCode = ThreadLocalRandom.current().nextLong();
|
||||
private final Unsafe unsafe;
|
||||
private final ChannelPipeline pipeline;
|
||||
private final DefaultChannelPipeline pipeline;
|
||||
private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null);
|
||||
private final VoidChannelPromise voidPromise = new VoidChannelPromise(this, true);
|
||||
private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
|
||||
@ -438,6 +438,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
doRegister();
|
||||
neverRegistered = false;
|
||||
registered = true;
|
||||
|
||||
if (firstRegistration) {
|
||||
// We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
|
||||
// that were added before the registration was done.
|
||||
pipeline.callHandlerAddedForAllHandlers();
|
||||
}
|
||||
|
||||
safeSetSuccess(promise);
|
||||
pipeline.fireChannelRegistered();
|
||||
// Only fire a channelActive if the channel has never been registered. This prevents firing
|
||||
|
@ -40,21 +40,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
private final String name;
|
||||
private boolean handlerRemoved;
|
||||
|
||||
/**
|
||||
* This is set to {@code true} once the {@link ChannelHandler#handlerAdded(ChannelHandlerContext) method is called.
|
||||
* We need to keep track of this to ensure we will never call another {@link ChannelHandler} method before
|
||||
* handlerAdded(...) is called to guard against ordering issues.
|
||||
* {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} MUST be the first
|
||||
* method that is called for handler when it becomes a part of the {@link ChannelPipeline} in all cases. Not doing
|
||||
* so may lead to unexpected side-effects as {@link ChannelHandler} implementations may need to do initialization
|
||||
* steps before a {@link ChannelHandler} can be used.
|
||||
*
|
||||
* See <a href="https://github.com/netty/netty/issues/4705">#4705</a>
|
||||
*
|
||||
* No need to mark volatile as this will be made visible as next/prev is volatile.
|
||||
*/
|
||||
private boolean handlerAdded;
|
||||
|
||||
// Will be set to null if no child executor should be used, otherwise it will be set to the
|
||||
// child executor.
|
||||
final EventExecutor executor;
|
||||
@ -115,7 +100,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
public ChannelHandlerContext fireChannelRegistered() {
|
||||
final AbstractChannelHandlerContext next = findContextInbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeChannelRegistered();
|
||||
} else {
|
||||
executor.execute(new OneTimeTask() {
|
||||
@ -140,7 +125,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
public ChannelHandlerContext fireChannelUnregistered() {
|
||||
final AbstractChannelHandlerContext next = findContextInbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeChannelUnregistered();
|
||||
} else {
|
||||
executor.execute(new OneTimeTask() {
|
||||
@ -165,7 +150,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
public ChannelHandlerContext fireChannelActive() {
|
||||
final AbstractChannelHandlerContext next = findContextInbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeChannelActive();
|
||||
} else {
|
||||
executor.execute(new OneTimeTask() {
|
||||
@ -190,7 +175,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
public ChannelHandlerContext fireChannelInactive() {
|
||||
final AbstractChannelHandlerContext next = findContextInbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeChannelInactive();
|
||||
} else {
|
||||
executor.execute(new OneTimeTask() {
|
||||
@ -220,7 +205,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
final AbstractChannelHandlerContext next = this.next;
|
||||
|
||||
EventExecutor executor = next.executor();
|
||||
if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeExceptionCaught(cause);
|
||||
} else {
|
||||
try {
|
||||
@ -260,7 +245,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
|
||||
final AbstractChannelHandlerContext next = findContextInbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeUserEventTriggered(event);
|
||||
} else {
|
||||
executor.execute(new OneTimeTask() {
|
||||
@ -289,7 +274,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
|
||||
final AbstractChannelHandlerContext next = findContextInbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeChannelRead(msg);
|
||||
} else {
|
||||
executor.execute(new OneTimeTask() {
|
||||
@ -314,7 +299,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
public ChannelHandlerContext fireChannelReadComplete() {
|
||||
final AbstractChannelHandlerContext next = findContextInbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeChannelReadComplete();
|
||||
} else {
|
||||
Runnable task = next.invokeChannelReadCompleteTask;
|
||||
@ -343,7 +328,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
public ChannelHandlerContext fireChannelWritabilityChanged() {
|
||||
final AbstractChannelHandlerContext next = findContextInbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeChannelWritabilityChanged();
|
||||
} else {
|
||||
Runnable task = next.invokeChannelWritableStateChangedTask;
|
||||
@ -410,7 +395,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
|
||||
final AbstractChannelHandlerContext next = findContextOutbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeBind(localAddress, promise);
|
||||
} else {
|
||||
safeExecute(executor, new OneTimeTask() {
|
||||
@ -450,7 +435,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
|
||||
final AbstractChannelHandlerContext next = findContextOutbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeConnect(remoteAddress, localAddress, promise);
|
||||
} else {
|
||||
safeExecute(executor, new OneTimeTask() {
|
||||
@ -480,7 +465,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
|
||||
final AbstractChannelHandlerContext next = findContextOutbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
|
||||
if (executor.inEventLoop()) {
|
||||
// Translate disconnect to close if the channel has no notion of disconnect-reconnect.
|
||||
// So far, UDP/IP is the only transport that has such behavior.
|
||||
if (!channel().metadata().hasDisconnect()) {
|
||||
@ -520,7 +505,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
|
||||
final AbstractChannelHandlerContext next = findContextOutbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeClose(promise);
|
||||
} else {
|
||||
safeExecute(executor, new OneTimeTask() {
|
||||
@ -551,7 +536,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
|
||||
final AbstractChannelHandlerContext next = findContextOutbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeDeregister(promise);
|
||||
} else {
|
||||
safeExecute(executor, new OneTimeTask() {
|
||||
@ -577,7 +562,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
public ChannelHandlerContext read() {
|
||||
final AbstractChannelHandlerContext next = findContextOutbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeRead();
|
||||
} else {
|
||||
Runnable task = next.invokeReadTask;
|
||||
@ -636,7 +621,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
public ChannelHandlerContext flush() {
|
||||
final AbstractChannelHandlerContext next = findContextOutbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeFlush();
|
||||
} else {
|
||||
Runnable task = next.invokeFlushTask;
|
||||
@ -682,7 +667,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
private void write(Object msg, boolean flush, ChannelPromise promise) {
|
||||
AbstractChannelHandlerContext next = findContextOutbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeWrite(msg, promise);
|
||||
if (flush) {
|
||||
next.invokeFlush();
|
||||
@ -835,14 +820,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
return handlerRemoved;
|
||||
}
|
||||
|
||||
final void setHandlerAddedCalled() {
|
||||
handlerAdded = true;
|
||||
}
|
||||
|
||||
final boolean isHandlerAddedCalled() {
|
||||
return handlerAdded;
|
||||
}
|
||||
|
||||
private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
|
||||
try {
|
||||
executor.execute(runnable);
|
||||
|
@ -37,6 +37,7 @@ import java.util.NoSuchElementException;
|
||||
import java.util.WeakHashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
|
||||
/**
|
||||
* The default {@link ChannelPipeline} implementation. It is usually created
|
||||
@ -61,6 +62,22 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
|
||||
private Map<EventExecutorGroup, EventExecutor> childExecutors;
|
||||
|
||||
/**
|
||||
* This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process
|
||||
* all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}.
|
||||
*
|
||||
* We only keep the head because it is expected that the list is used infrequently and its size is small.
|
||||
* Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management
|
||||
* complexity.
|
||||
*/
|
||||
private PendingHandlerCallback pendingHandlerCallbackHead;
|
||||
|
||||
/**
|
||||
* Set to {@code true} once the {@link AbstractChannel} is registered.Once set to {@code true} the value will never
|
||||
* change.
|
||||
*/
|
||||
private boolean registered;
|
||||
|
||||
public DefaultChannelPipeline(AbstractChannel channel) {
|
||||
if (channel == null) {
|
||||
throw new NullPointerException("channel");
|
||||
@ -68,9 +85,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
this.channel = channel;
|
||||
|
||||
tail = new TailContext(this);
|
||||
tail.setHandlerAddedCalled();
|
||||
head = new HeadContext(this);
|
||||
head.setHandlerAddedCalled();
|
||||
|
||||
head.next = tail;
|
||||
tail.prev = head;
|
||||
@ -110,23 +125,53 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized ChannelPipeline addFirst(EventExecutorGroup group, final String name, ChannelHandler handler) {
|
||||
checkDuplicateName(name);
|
||||
AbstractChannelHandlerContext newCtx = newContext(group, name, handler);
|
||||
addFirst0(newCtx);
|
||||
public ChannelPipeline addFirst(EventExecutorGroup group, final String name, ChannelHandler handler) {
|
||||
final AbstractChannelHandlerContext newCtx;
|
||||
final EventExecutor executor;
|
||||
final boolean inEventLoop;
|
||||
synchronized (this) {
|
||||
checkDuplicateName(name);
|
||||
checkMultiplicity(handler);
|
||||
|
||||
newCtx = newContext(group, name, handler);
|
||||
executor = executorSafe(newCtx.executor);
|
||||
|
||||
// If the executor is null it means that the channel was not registered on an eventloop yet.
|
||||
// In this case we add the context to the pipeline and add a task that will call
|
||||
// ChannelHandler.handlerAdded(...) once the channel is registered.
|
||||
if (executor == null) {
|
||||
addFirst0(newCtx);
|
||||
callHandlerCallbackLater(newCtx, true);
|
||||
return this;
|
||||
}
|
||||
inEventLoop = executor.inEventLoop();
|
||||
if (inEventLoop) {
|
||||
addFirst0(newCtx);
|
||||
}
|
||||
}
|
||||
|
||||
if (inEventLoop) {
|
||||
callHandlerAdded0(newCtx);
|
||||
} else {
|
||||
waitForFuture(executor.submit(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (DefaultChannelPipeline.this) {
|
||||
addFirst0(newCtx);
|
||||
}
|
||||
callHandlerAdded0(newCtx);
|
||||
}
|
||||
}));
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
private void addFirst0(AbstractChannelHandlerContext newCtx) {
|
||||
checkMultiplicity(newCtx);
|
||||
|
||||
AbstractChannelHandlerContext nextCtx = head.next;
|
||||
newCtx.prev = head;
|
||||
newCtx.next = nextCtx;
|
||||
head.next = newCtx;
|
||||
nextCtx.prev = newCtx;
|
||||
|
||||
callHandlerAdded(newCtx);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -135,23 +180,52 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
|
||||
checkDuplicateName(name);
|
||||
AbstractChannelHandlerContext newCtx = newContext(group, name, handler);
|
||||
addLast0(newCtx);
|
||||
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
|
||||
final EventExecutor executor;
|
||||
final AbstractChannelHandlerContext newCtx;
|
||||
final boolean inEventLoop;
|
||||
synchronized (this) {
|
||||
checkDuplicateName(name);
|
||||
checkMultiplicity(handler);
|
||||
|
||||
newCtx = newContext(group, name, handler);
|
||||
executor = executorSafe(newCtx.executor);
|
||||
|
||||
// If the executor is null it means that the channel was not registered on an eventloop yet.
|
||||
// In this case we add the context to the pipeline and add a task that will call
|
||||
// ChannelHandler.handlerAdded(...) once the channel is registered.
|
||||
if (executor == null) {
|
||||
addLast0(newCtx);
|
||||
callHandlerCallbackLater(newCtx, true);
|
||||
return this;
|
||||
}
|
||||
inEventLoop = executor.inEventLoop();
|
||||
if (inEventLoop) {
|
||||
addLast0(newCtx);
|
||||
}
|
||||
}
|
||||
if (inEventLoop) {
|
||||
callHandlerAdded0(newCtx);
|
||||
} else {
|
||||
waitForFuture(executor.submit(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (DefaultChannelPipeline.this) {
|
||||
addLast0(newCtx);
|
||||
}
|
||||
callHandlerAdded0(newCtx);
|
||||
}
|
||||
}));
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
private void addLast0(AbstractChannelHandlerContext newCtx) {
|
||||
checkMultiplicity(newCtx);
|
||||
|
||||
AbstractChannelHandlerContext prev = tail.prev;
|
||||
newCtx.prev = prev;
|
||||
newCtx.next = tail;
|
||||
prev.next = newCtx;
|
||||
tail.prev = newCtx;
|
||||
|
||||
callHandlerAdded(newCtx);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -160,24 +234,56 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized ChannelPipeline addBefore(
|
||||
public ChannelPipeline addBefore(
|
||||
EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) {
|
||||
AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
|
||||
checkDuplicateName(name);
|
||||
AbstractChannelHandlerContext newCtx = newContext(group, name, handler);
|
||||
addBefore0(ctx, newCtx);
|
||||
final EventExecutor executor;
|
||||
final AbstractChannelHandlerContext newCtx;
|
||||
final AbstractChannelHandlerContext ctx;
|
||||
final boolean inEventLoop;
|
||||
synchronized (this) {
|
||||
checkMultiplicity(handler);
|
||||
ctx = getContextOrDie(baseName);
|
||||
checkDuplicateName(name);
|
||||
|
||||
newCtx = newContext(group, name, handler);
|
||||
executor = executorSafe(newCtx.executor);
|
||||
|
||||
// If the executor is null it means that the channel was not registered on an eventloop yet.
|
||||
// In this case we add the context to the pipeline and add a task that will call
|
||||
// ChannelHandler.handlerAdded(...) once the channel is registered.
|
||||
if (executor == null) {
|
||||
addBefore0(ctx, newCtx);
|
||||
callHandlerCallbackLater(newCtx, true);
|
||||
return this;
|
||||
}
|
||||
|
||||
inEventLoop = executor.inEventLoop();
|
||||
if (inEventLoop) {
|
||||
addBefore0(ctx, newCtx);
|
||||
}
|
||||
}
|
||||
|
||||
if (inEventLoop) {
|
||||
callHandlerAdded0(newCtx);
|
||||
} else {
|
||||
waitForFuture(executor.submit(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (DefaultChannelPipeline.this) {
|
||||
addBefore0(ctx, newCtx);
|
||||
}
|
||||
callHandlerAdded0(newCtx);
|
||||
}
|
||||
}));
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
private void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
|
||||
checkMultiplicity(newCtx);
|
||||
|
||||
private static void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
|
||||
newCtx.prev = ctx.prev;
|
||||
newCtx.next = ctx;
|
||||
ctx.prev.next = newCtx;
|
||||
ctx.prev = newCtx;
|
||||
|
||||
callHandlerAdded(newCtx);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -186,24 +292,55 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized ChannelPipeline addAfter(
|
||||
public ChannelPipeline addAfter(
|
||||
EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) {
|
||||
AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
|
||||
checkDuplicateName(name);
|
||||
AbstractChannelHandlerContext newCtx = newContext(group, name, handler);
|
||||
addAfter0(ctx, newCtx);
|
||||
final EventExecutor executor;
|
||||
final AbstractChannelHandlerContext newCtx;
|
||||
final AbstractChannelHandlerContext ctx;
|
||||
final boolean inEventLoop;
|
||||
|
||||
synchronized (this) {
|
||||
checkMultiplicity(handler);
|
||||
ctx = getContextOrDie(baseName);
|
||||
checkDuplicateName(name);
|
||||
|
||||
newCtx = newContext(group, name, handler);
|
||||
executor = executorSafe(newCtx.executor);
|
||||
|
||||
// If the executor is null it means that the channel was not registered on an eventloop yet.
|
||||
// In this case we remove the context from the pipeline and add a task that will call
|
||||
// ChannelHandler.handlerRemoved(...) once the channel is registered.
|
||||
if (executor == null) {
|
||||
addAfter0(ctx, newCtx);
|
||||
callHandlerCallbackLater(newCtx, true);
|
||||
return this;
|
||||
}
|
||||
inEventLoop = executor.inEventLoop();
|
||||
if (inEventLoop) {
|
||||
addAfter0(ctx, newCtx);
|
||||
}
|
||||
}
|
||||
if (inEventLoop) {
|
||||
callHandlerAdded0(newCtx);
|
||||
} else {
|
||||
waitForFuture(executor.submit(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (DefaultChannelPipeline.this) {
|
||||
addAfter0(ctx, newCtx);
|
||||
}
|
||||
callHandlerAdded0(newCtx);
|
||||
}
|
||||
}));
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
private void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
|
||||
checkMultiplicity(newCtx);
|
||||
|
||||
private static void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
|
||||
newCtx.prev = ctx;
|
||||
newCtx.next = ctx.next;
|
||||
ctx.next.prev = newCtx;
|
||||
ctx.next = newCtx;
|
||||
|
||||
callHandlerAdded(newCtx);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -265,21 +402,18 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
cache.put(handlerType, name);
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
// It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
|
||||
// any name conflicts. Note that we don't cache the names generated here.
|
||||
if (context0(name) != null) {
|
||||
String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
|
||||
for (int i = 1;; i ++) {
|
||||
String newName = baseName + i;
|
||||
if (context0(newName) == null) {
|
||||
name = newName;
|
||||
break;
|
||||
}
|
||||
// It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
|
||||
// any name conflicts. Note that we don't cache the names generated here.
|
||||
if (context0(name) != null) {
|
||||
String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
|
||||
for (int i = 1;; i ++) {
|
||||
String newName = baseName + i;
|
||||
if (context0(newName) == null) {
|
||||
name = newName;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return name;
|
||||
}
|
||||
|
||||
@ -307,40 +441,45 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
|
||||
assert ctx != head && ctx != tail;
|
||||
|
||||
AbstractChannelHandlerContext context;
|
||||
Future<?> future;
|
||||
|
||||
final EventExecutor executor;
|
||||
final boolean inEventLoop;
|
||||
synchronized (this) {
|
||||
if (!isExecuteLater(ctx)) {
|
||||
executor = executorSafe(ctx.executor);
|
||||
|
||||
// If the executor is null it means that the channel was not registered on an eventloop yet.
|
||||
// In this case we remove the context from the pipeline and add a task that will call
|
||||
// ChannelHandler.handlerRemoved(...) once the channel is registered.
|
||||
if (executor == null) {
|
||||
remove0(ctx);
|
||||
callHandlerCallbackLater(ctx, false);
|
||||
return ctx;
|
||||
} else {
|
||||
future = ctx.executor().submit(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (DefaultChannelPipeline.this) {
|
||||
remove0(ctx);
|
||||
}
|
||||
}
|
||||
});
|
||||
context = ctx;
|
||||
}
|
||||
inEventLoop = executor.inEventLoop();
|
||||
if (inEventLoop) {
|
||||
remove0(ctx);
|
||||
}
|
||||
}
|
||||
|
||||
// Run the following 'waiting' code outside of the above synchronized block
|
||||
// in order to avoid deadlock
|
||||
|
||||
waitForFuture(future);
|
||||
|
||||
return context;
|
||||
if (inEventLoop) {
|
||||
callHandlerRemoved0(ctx);
|
||||
} else {
|
||||
waitForFuture(executor.submit(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (DefaultChannelPipeline.this) {
|
||||
remove0(ctx);
|
||||
}
|
||||
callHandlerRemoved0(ctx);
|
||||
}
|
||||
}));
|
||||
}
|
||||
return ctx;
|
||||
}
|
||||
|
||||
void remove0(AbstractChannelHandlerContext ctx) {
|
||||
private static void remove0(AbstractChannelHandlerContext ctx) {
|
||||
AbstractChannelHandlerContext prev = ctx.prev;
|
||||
AbstractChannelHandlerContext next = ctx.next;
|
||||
prev.next = next;
|
||||
next.prev = prev;
|
||||
callHandlerRemoved(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -378,46 +517,62 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
}
|
||||
|
||||
private ChannelHandler replace(
|
||||
final AbstractChannelHandlerContext ctx, final String newName,
|
||||
ChannelHandler newHandler) {
|
||||
|
||||
final AbstractChannelHandlerContext ctx, final String newName, ChannelHandler newHandler) {
|
||||
assert ctx != head && ctx != tail;
|
||||
|
||||
Future<?> future;
|
||||
final AbstractChannelHandlerContext newCtx;
|
||||
final EventExecutor executor;
|
||||
final boolean inEventLoop;
|
||||
synchronized (this) {
|
||||
checkMultiplicity(newHandler);
|
||||
boolean sameName = ctx.name().equals(newName);
|
||||
if (!sameName) {
|
||||
checkDuplicateName(newName);
|
||||
}
|
||||
|
||||
final AbstractChannelHandlerContext newCtx = newContext(ctx.executor, newName, newHandler);
|
||||
newCtx = newContext(ctx.executor, newName, newHandler);
|
||||
executor = executorSafe(ctx.executor);
|
||||
|
||||
if (!isExecuteLater(newCtx)) {
|
||||
// If the executor is null it means that the channel was not registered on an eventloop yet.
|
||||
// In this case we replace the context in the pipeline
|
||||
// and add a task that will call ChannelHandler.handlerAdded(...) and
|
||||
// ChannelHandler.handlerRemoved(...) once the channel is registered.
|
||||
if (executor == null) {
|
||||
replace0(ctx, newCtx);
|
||||
callHandlerCallbackLater(newCtx, true);
|
||||
callHandlerCallbackLater(ctx, false);
|
||||
return ctx.handler();
|
||||
} else {
|
||||
future = newCtx.executor().submit(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (DefaultChannelPipeline.this) {
|
||||
replace0(ctx, newCtx);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
inEventLoop = executor.inEventLoop();
|
||||
if (inEventLoop) {
|
||||
replace0(ctx, newCtx);
|
||||
}
|
||||
}
|
||||
|
||||
// Run the following 'waiting' code outside of the above synchronized block
|
||||
// in order to avoid deadlock
|
||||
|
||||
waitForFuture(future);
|
||||
|
||||
if (inEventLoop) {
|
||||
// Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
|
||||
// because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those
|
||||
// event handlers must be called after handlerAdded().
|
||||
callHandlerAdded0(newCtx);
|
||||
callHandlerRemoved0(ctx);
|
||||
} else {
|
||||
waitForFuture(executor.submit(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (DefaultChannelPipeline.this) {
|
||||
replace0(ctx, newCtx);
|
||||
}
|
||||
// Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
|
||||
// because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and
|
||||
// those event handlers must be called after handlerAdded().
|
||||
callHandlerAdded0(newCtx);
|
||||
callHandlerRemoved0(ctx);
|
||||
}
|
||||
}));
|
||||
}
|
||||
return ctx.handler();
|
||||
}
|
||||
|
||||
private void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) {
|
||||
checkMultiplicity(newCtx);
|
||||
|
||||
private static void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) {
|
||||
AbstractChannelHandlerContext prev = oldCtx.prev;
|
||||
AbstractChannelHandlerContext next = oldCtx.next;
|
||||
newCtx.prev = prev;
|
||||
@ -433,16 +588,9 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
// update the reference to the replacement so forward of buffered content will work correctly
|
||||
oldCtx.prev = newCtx;
|
||||
oldCtx.next = newCtx;
|
||||
|
||||
// Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
|
||||
// because callHandlerRemoved() will trigger inboundBufferUpdated() or flush() on newHandler and those
|
||||
// event handlers must be called after handlerAdded().
|
||||
callHandlerAdded(newCtx);
|
||||
callHandlerRemoved(oldCtx);
|
||||
}
|
||||
|
||||
private static void checkMultiplicity(ChannelHandlerContext ctx) {
|
||||
ChannelHandler handler = ctx.handler();
|
||||
private static void checkMultiplicity(ChannelHandler handler) {
|
||||
if (handler instanceof ChannelHandlerAdapter) {
|
||||
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
|
||||
if (!h.isSharable() && h.added) {
|
||||
@ -454,31 +602,18 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
}
|
||||
}
|
||||
|
||||
private void callHandlerAdded(final AbstractChannelHandlerContext ctx) {
|
||||
if (isExecuteLater(ctx)) {
|
||||
ctx.executor().execute(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
callHandlerAdded0(ctx);
|
||||
}
|
||||
});
|
||||
return;
|
||||
}
|
||||
callHandlerAdded0(ctx);
|
||||
}
|
||||
|
||||
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
|
||||
try {
|
||||
try {
|
||||
ctx.handler().handlerAdded(ctx);
|
||||
} finally {
|
||||
// handlerAdded(...) method was called.
|
||||
ctx.setHandlerAddedCalled();
|
||||
}
|
||||
ctx.handler().handlerAdded(ctx);
|
||||
} catch (Throwable t) {
|
||||
boolean removed = false;
|
||||
try {
|
||||
remove((AbstractChannelHandlerContext) ctx);
|
||||
remove0(ctx);
|
||||
try {
|
||||
ctx.handler().handlerRemoved(ctx);
|
||||
} finally {
|
||||
ctx.setRemoved();
|
||||
}
|
||||
removed = true;
|
||||
} catch (Throwable t2) {
|
||||
if (logger.isWarnEnabled()) {
|
||||
@ -498,34 +633,20 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
}
|
||||
}
|
||||
|
||||
private void callHandlerRemoved(final AbstractChannelHandlerContext ctx) {
|
||||
if (isExecuteLater(ctx)) {
|
||||
ctx.executor().execute(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
callHandlerRemoved0(ctx);
|
||||
}
|
||||
});
|
||||
return;
|
||||
}
|
||||
callHandlerRemoved0(ctx);
|
||||
}
|
||||
|
||||
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
|
||||
// Notify the complete removal.
|
||||
try {
|
||||
ctx.handler().handlerRemoved(ctx);
|
||||
ctx.setRemoved();
|
||||
try {
|
||||
ctx.handler().handlerRemoved(ctx);
|
||||
} finally {
|
||||
ctx.setRemoved();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
fireExceptionCaught(new ChannelPipelineException(
|
||||
ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean isExecuteLater(ChannelHandlerContext ctx) {
|
||||
return ctx.channel().isRegistered() && !ctx.executor().inEventLoop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits for a future to finish. If the task is interrupted, then the current thread will be interrupted.
|
||||
* It is expected that the task performs any appropriate locking.
|
||||
@ -747,7 +868,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
*
|
||||
* See: https://github.com/netty/netty/issues/3156
|
||||
*/
|
||||
private void destroy() {
|
||||
private synchronized void destroy() {
|
||||
destroyUp(head.next, false);
|
||||
}
|
||||
|
||||
@ -789,6 +910,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
if (inEventLoop || executor.inEventLoop(currentThread)) {
|
||||
synchronized (this) {
|
||||
remove0(ctx);
|
||||
callHandlerRemoved0(ctx);
|
||||
}
|
||||
} else {
|
||||
final AbstractChannelHandlerContext finalCtx = ctx;
|
||||
@ -992,6 +1114,61 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Should be called before {@link #fireChannelRegistered()} is called the first time.
|
||||
*/
|
||||
void callHandlerAddedForAllHandlers() {
|
||||
// This should only called from within the EventLoop.
|
||||
assert channel.eventLoop().inEventLoop();
|
||||
|
||||
final PendingHandlerCallback pendingHandlerCallbackHead;
|
||||
synchronized (this) {
|
||||
assert !registered;
|
||||
|
||||
// This Channel itself was registered.
|
||||
registered = true;
|
||||
|
||||
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
|
||||
// Null out so it can be GC'ed.
|
||||
this.pendingHandlerCallbackHead = null;
|
||||
}
|
||||
|
||||
// This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
|
||||
// holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
|
||||
// the EventLoop.
|
||||
PendingHandlerCallback task = pendingHandlerCallbackHead;
|
||||
while (task != null) {
|
||||
task.execute();
|
||||
task = task.next;
|
||||
}
|
||||
}
|
||||
|
||||
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
|
||||
assert !registered;
|
||||
|
||||
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
|
||||
PendingHandlerCallback pending = pendingHandlerCallbackHead;
|
||||
if (pending == null) {
|
||||
pendingHandlerCallbackHead = task;
|
||||
} else {
|
||||
// Find the tail of the linked-list.
|
||||
while (pending.next != null) {
|
||||
pending = pending.next;
|
||||
}
|
||||
pending.next = task;
|
||||
}
|
||||
}
|
||||
|
||||
private EventExecutor executorSafe(EventExecutor eventExecutor) {
|
||||
if (eventExecutor == null) {
|
||||
// We check for channel().isRegistered and handlerAdded because even if isRegistered() is false we
|
||||
// can safely access the eventLoop() if handlerAdded is true. This is because in this case the Channel
|
||||
// was previously registered and so we can still access the old EventLoop to dispatch things.
|
||||
return channel.isRegistered() || registered ? channel.eventLoop() : null;
|
||||
}
|
||||
return eventExecutor;
|
||||
}
|
||||
|
||||
// A special catch-all handler that handles both bytes and messages.
|
||||
static final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
|
||||
|
||||
@ -1137,4 +1314,79 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
ctx.fireExceptionCaught(cause);
|
||||
}
|
||||
}
|
||||
|
||||
private abstract static class PendingHandlerCallback extends OneTimeTask {
|
||||
final AbstractChannelHandlerContext ctx;
|
||||
PendingHandlerCallback next;
|
||||
|
||||
PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
|
||||
this.ctx = ctx;
|
||||
}
|
||||
|
||||
abstract void execute();
|
||||
}
|
||||
|
||||
private final class PendingHandlerAddedTask extends PendingHandlerCallback {
|
||||
|
||||
PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
|
||||
super(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
callHandlerAdded0(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
void execute() {
|
||||
EventExecutor executor = ctx.executor();
|
||||
if (executor.inEventLoop()) {
|
||||
callHandlerAdded0(ctx);
|
||||
} else {
|
||||
try {
|
||||
executor.execute(this);
|
||||
} catch (RejectedExecutionException e) {
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.warn(
|
||||
"Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
|
||||
executor, ctx.name(), e);
|
||||
}
|
||||
remove0(ctx);
|
||||
ctx.setRemoved();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final class PendingHandlerRemovedTask extends PendingHandlerCallback {
|
||||
|
||||
PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
|
||||
super(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
callHandlerRemoved0(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
void execute() {
|
||||
EventExecutor executor = ctx.executor();
|
||||
if (executor.inEventLoop()) {
|
||||
callHandlerRemoved0(ctx);
|
||||
} else {
|
||||
try {
|
||||
executor.execute(this);
|
||||
} catch (RejectedExecutionException e) {
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.warn(
|
||||
"Can't invoke handlerRemoved() as the EventExecutor {} rejected it," +
|
||||
" removing handler {}.", executor, ctx.name(), e);
|
||||
}
|
||||
// remove0(...) was call before so just call AbstractChannelHandlerContext.setRemoved().
|
||||
ctx.setRemoved();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -30,8 +30,11 @@ import io.netty.util.AbstractReferenceCounted;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.ReferenceCounted;
|
||||
import io.netty.util.concurrent.AbstractEventExecutor;
|
||||
import io.netty.util.concurrent.DefaultEventExecutorGroup;
|
||||
import io.netty.util.concurrent.EventExecutor;
|
||||
import io.netty.util.concurrent.EventExecutorGroup;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Test;
|
||||
@ -41,10 +44,13 @@ import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
@ -595,6 +601,415 @@ public class DefaultChannelPipelineTest {
|
||||
assertTrue(handlerLatch.await(2, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test(timeout = 3000)
|
||||
public void testAddHandlerBeforeRegisteredThenRemove() {
|
||||
final EventLoop loop = group.next();
|
||||
|
||||
CheckEventExecutorHandler handler = new CheckEventExecutorHandler(loop);
|
||||
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||
pipeline.addFirst(handler);
|
||||
assertFalse(handler.addedPromise.isDone());
|
||||
group.register(pipeline.channel());
|
||||
handler.addedPromise.syncUninterruptibly();
|
||||
pipeline.remove(handler);
|
||||
handler.removedPromise.syncUninterruptibly();
|
||||
}
|
||||
|
||||
@Test(timeout = 3000)
|
||||
public void testAddHandlerBeforeRegisteredThenReplace() throws Exception {
|
||||
final EventLoop loop = group.next();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
CheckEventExecutorHandler handler = new CheckEventExecutorHandler(loop);
|
||||
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||
pipeline.addFirst(handler);
|
||||
assertFalse(handler.addedPromise.isDone());
|
||||
group.register(pipeline.channel());
|
||||
handler.addedPromise.syncUninterruptibly();
|
||||
pipeline.replace(handler, "newHandler", new ChannelHandlerAdapter() {
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
handler.removedPromise.syncUninterruptibly();
|
||||
latch.await();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddRemoveHandlerNotRegistered() throws Throwable {
|
||||
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
|
||||
ChannelHandler handler = new ErrorChannelHandler(error);
|
||||
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||
pipeline.addFirst(handler);
|
||||
pipeline.remove(handler);
|
||||
|
||||
Throwable cause = error.get();
|
||||
if (cause != null) {
|
||||
throw cause;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddReplaceHandlerNotRegistered() throws Throwable {
|
||||
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
|
||||
ChannelHandler handler = new ErrorChannelHandler(error);
|
||||
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||
pipeline.addFirst(handler);
|
||||
pipeline.replace(handler, "newHandler", new ErrorChannelHandler(error));
|
||||
|
||||
Throwable cause = error.get();
|
||||
if (cause != null) {
|
||||
throw cause;
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 3000)
|
||||
public void testHandlerAddedAndRemovedCalledInCorrectOrder() throws Throwable {
|
||||
final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1);
|
||||
final EventExecutorGroup group2 = new DefaultEventExecutorGroup(1);
|
||||
|
||||
try {
|
||||
BlockingQueue<CheckOrderHandler> addedQueue = new LinkedBlockingQueue<CheckOrderHandler>();
|
||||
BlockingQueue<CheckOrderHandler> removedQueue = new LinkedBlockingQueue<CheckOrderHandler>();
|
||||
|
||||
CheckOrderHandler handler1 = new CheckOrderHandler(addedQueue, removedQueue);
|
||||
CheckOrderHandler handler2 = new CheckOrderHandler(addedQueue, removedQueue);
|
||||
CheckOrderHandler handler3 = new CheckOrderHandler(addedQueue, removedQueue);
|
||||
CheckOrderHandler handler4 = new CheckOrderHandler(addedQueue, removedQueue);
|
||||
|
||||
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||
pipeline.addLast(handler1);
|
||||
group.register(pipeline.channel()).syncUninterruptibly();
|
||||
pipeline.addLast(group1, handler2);
|
||||
pipeline.addLast(group2, handler3);
|
||||
pipeline.addLast(handler4);
|
||||
|
||||
assertTrue(removedQueue.isEmpty());
|
||||
pipeline.channel().close().syncUninterruptibly();
|
||||
assertHandler(handler1, addedQueue.take());
|
||||
assertHandler(handler2, addedQueue.take());
|
||||
assertHandler(handler3, addedQueue.take());
|
||||
assertHandler(handler4, addedQueue.take());
|
||||
assertTrue(addedQueue.isEmpty());
|
||||
|
||||
assertHandler(handler4, removedQueue.take());
|
||||
assertHandler(handler3, removedQueue.take());
|
||||
assertHandler(handler2, removedQueue.take());
|
||||
assertHandler(handler1, removedQueue.take());
|
||||
assertTrue(removedQueue.isEmpty());
|
||||
} finally {
|
||||
group1.shutdownGracefully();
|
||||
group2.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 3000)
|
||||
public void testHandlerAddedExceptionFromChildHandlerIsPropegated() {
|
||||
final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1);
|
||||
try {
|
||||
final Promise<Void> promise = group1.next().newPromise();
|
||||
final AtomicBoolean handlerAdded = new AtomicBoolean();
|
||||
final Exception exception = new RuntimeException();
|
||||
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||
pipeline.addLast(new ChannelHandlerAdapter() {
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||
handlerAdded.set(true);
|
||||
throw exception;
|
||||
}
|
||||
});
|
||||
pipeline.addLast(group1, new CheckExceptionHandler(exception, promise));
|
||||
assertFalse(handlerAdded.get());
|
||||
group.register(pipeline.channel());
|
||||
promise.syncUninterruptibly();
|
||||
} finally {
|
||||
group1.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 3000)
|
||||
public void testHandlerRemovedExceptionFromChildHandlerIsPropegated() {
|
||||
final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1);
|
||||
try {
|
||||
final Promise<Void> promise = group1.next().newPromise();
|
||||
String handlerName = "foo";
|
||||
final Exception exception = new RuntimeException();
|
||||
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||
pipeline.addLast(handlerName, new ChannelHandlerAdapter() {
|
||||
@Override
|
||||
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
||||
throw exception;
|
||||
}
|
||||
});
|
||||
pipeline.addLast(group1, new CheckExceptionHandler(exception, promise));
|
||||
group.register(pipeline.channel()).syncUninterruptibly();
|
||||
pipeline.remove(handlerName);
|
||||
promise.syncUninterruptibly();
|
||||
} finally {
|
||||
group1.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 3000)
|
||||
public void testHandlerAddedThrowsAndRemovedThrowsException() {
|
||||
final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1);
|
||||
try {
|
||||
final Promise<Void> promise = group1.next().newPromise();
|
||||
final Exception exceptionAdded = new RuntimeException();
|
||||
final Exception exceptionRemoved = new RuntimeException();
|
||||
String handlerName = "foo";
|
||||
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||
pipeline.addLast(handlerName, new ChannelHandlerAdapter() {
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||
throw exceptionAdded;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
||||
throw exceptionRemoved;
|
||||
}
|
||||
});
|
||||
pipeline.addLast(group1, new CheckExceptionHandler(exceptionAdded, promise));
|
||||
group.register(pipeline.channel()).syncUninterruptibly();
|
||||
assertNull(pipeline.context(handlerName));
|
||||
promise.syncUninterruptibly();
|
||||
} finally {
|
||||
group1.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 3000)
|
||||
public void testHandlerAddBlocksUntilHandlerAddedCalled() {
|
||||
final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1);
|
||||
try {
|
||||
final Promise<Void> promise = group1.next().newPromise();
|
||||
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||
group.register(pipeline.channel()).syncUninterruptibly();
|
||||
|
||||
pipeline.addLast(new ChannelHandlerAdapter() {
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||
final AtomicBoolean handlerAddedCalled = new AtomicBoolean();
|
||||
ctx.pipeline().addLast(group1, new ChannelHandlerAdapter() {
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||
handlerAddedCalled.set(true);
|
||||
}
|
||||
});
|
||||
if (handlerAddedCalled.get()) {
|
||||
promise.setSuccess(null);
|
||||
} else {
|
||||
promise.setFailure(new AssertionError("handlerAdded(...) was not called yet"));
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
promise.syncUninterruptibly();
|
||||
} finally {
|
||||
group1.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddRemoveHandlerCalledOnceRegistered() throws Throwable {
|
||||
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||
CallbackCheckHandler handler = new CallbackCheckHandler();
|
||||
|
||||
pipeline.addFirst(handler);
|
||||
pipeline.remove(handler);
|
||||
|
||||
assertFalse(handler.addedHandler.get());
|
||||
assertFalse(handler.removedHandler.get());
|
||||
|
||||
group.register(pipeline.channel()).syncUninterruptibly();
|
||||
Throwable cause = handler.error.get();
|
||||
if (cause != null) {
|
||||
throw cause;
|
||||
}
|
||||
|
||||
assertTrue(handler.addedHandler.get());
|
||||
assertTrue(handler.removedHandler.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddReplaceHandlerCalledOnceRegistered() throws Throwable {
|
||||
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||
CallbackCheckHandler handler = new CallbackCheckHandler();
|
||||
CallbackCheckHandler handler2 = new CallbackCheckHandler();
|
||||
|
||||
pipeline.addFirst(handler);
|
||||
pipeline.replace(handler, "newHandler", handler2);
|
||||
|
||||
assertFalse(handler.addedHandler.get());
|
||||
assertFalse(handler.removedHandler.get());
|
||||
assertFalse(handler2.addedHandler.get());
|
||||
assertFalse(handler2.removedHandler.get());
|
||||
|
||||
group.register(pipeline.channel()).syncUninterruptibly();
|
||||
Throwable cause = handler.error.get();
|
||||
if (cause != null) {
|
||||
throw cause;
|
||||
}
|
||||
|
||||
assertTrue(handler.addedHandler.get());
|
||||
assertTrue(handler.removedHandler.get());
|
||||
|
||||
Throwable cause2 = handler2.error.get();
|
||||
if (cause2 != null) {
|
||||
throw cause2;
|
||||
}
|
||||
|
||||
assertTrue(handler2.addedHandler.get());
|
||||
assertFalse(handler2.removedHandler.get());
|
||||
pipeline.remove(handler2);
|
||||
assertTrue(handler2.removedHandler.get());
|
||||
}
|
||||
|
||||
private static final class CallbackCheckHandler extends ChannelHandlerAdapter {
|
||||
final AtomicBoolean addedHandler = new AtomicBoolean();
|
||||
final AtomicBoolean removedHandler = new AtomicBoolean();
|
||||
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
|
||||
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||
if (!addedHandler.compareAndSet(false, true)) {
|
||||
error.set(new AssertionError("handlerAdded(...) called multiple times: " + ctx.name()));
|
||||
} else if (removedHandler.get()) {
|
||||
error.set(new AssertionError("handlerRemoved(...) called before handlerAdded(...): " + ctx.name()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
||||
if (!removedHandler.compareAndSet(false, true)) {
|
||||
error.set(new AssertionError("handlerRemoved(...) called multiple times: " + ctx.name()));
|
||||
} else if (!addedHandler.get()) {
|
||||
error.set(new AssertionError("handlerRemoved(...) called before handlerAdded(...): " + ctx.name()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static final class CheckExceptionHandler extends ChannelInboundHandlerAdapter {
|
||||
private final Throwable expected;
|
||||
private final Promise<Void> promise;
|
||||
|
||||
CheckExceptionHandler(Throwable expected, Promise<Void> promise) {
|
||||
this.expected = expected;
|
||||
this.promise = promise;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
if (cause instanceof ChannelPipelineException && cause.getCause() == expected) {
|
||||
promise.setSuccess(null);
|
||||
} else {
|
||||
promise.setFailure(new AssertionError("cause not the expected instance"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void assertHandler(CheckOrderHandler expected, CheckOrderHandler actual) throws Throwable {
|
||||
assertSame(expected, actual);
|
||||
actual.checkError();
|
||||
}
|
||||
|
||||
private static final class CheckOrderHandler extends ChannelHandlerAdapter {
|
||||
private final Queue<CheckOrderHandler> addedQueue;
|
||||
private final Queue<CheckOrderHandler> removedQueue;
|
||||
private final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
|
||||
|
||||
CheckOrderHandler(Queue<CheckOrderHandler> addedQueue, Queue<CheckOrderHandler> removedQueue) {
|
||||
this.addedQueue = addedQueue;
|
||||
this.removedQueue = removedQueue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||
addedQueue.add(this);
|
||||
checkExecutor(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
||||
removedQueue.add(this);
|
||||
checkExecutor(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
error.set(cause);
|
||||
}
|
||||
|
||||
void checkError() throws Throwable {
|
||||
Throwable cause = error.get();
|
||||
if (cause != null) {
|
||||
throw cause;
|
||||
}
|
||||
}
|
||||
|
||||
private void checkExecutor(ChannelHandlerContext ctx) {
|
||||
if (!ctx.executor().inEventLoop()) {
|
||||
error.set(new AssertionError());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static final class CheckEventExecutorHandler extends ChannelHandlerAdapter {
|
||||
final EventExecutor executor;
|
||||
final Promise<Void> addedPromise;
|
||||
final Promise<Void> removedPromise;
|
||||
|
||||
CheckEventExecutorHandler(EventExecutor executor) {
|
||||
this.executor = executor;
|
||||
addedPromise = executor.newPromise();
|
||||
removedPromise = executor.newPromise();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||
assertExecutor(ctx, addedPromise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
||||
assertExecutor(ctx, removedPromise);
|
||||
}
|
||||
|
||||
private void assertExecutor(ChannelHandlerContext ctx, Promise<Void> promise) {
|
||||
final boolean same;
|
||||
try {
|
||||
same = executor == ctx.executor();
|
||||
} catch (Throwable cause) {
|
||||
promise.setFailure(cause);
|
||||
return;
|
||||
}
|
||||
if (same) {
|
||||
promise.setSuccess(null);
|
||||
} else {
|
||||
promise.setFailure(new AssertionError("EventExecutor not the same"));
|
||||
}
|
||||
}
|
||||
}
|
||||
private static final class ErrorChannelHandler extends ChannelHandlerAdapter {
|
||||
private final AtomicReference<Throwable> error;
|
||||
|
||||
ErrorChannelHandler(AtomicReference<Throwable> error) {
|
||||
this.error = error;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||
error.set(new AssertionError());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
||||
error.set(new AssertionError());
|
||||
}
|
||||
}
|
||||
|
||||
private static int next(AbstractChannelHandlerContext ctx) {
|
||||
AbstractChannelHandlerContext next = ctx.next;
|
||||
if (next == null) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user