Ensure ChannelHandler.handlerAdded(...) is always called as first method of the handler

Motivation:

If a user adds a ChannelHandler from outside the EventLoop it is possible to get into the situation that handlerAdded(...) is scheduled on the EventLoop and so called after another methods of the ChannelHandler as the EventLoop may already be executing on this point in time.

Modification:

- Ensure we always check if the handlerAdded(...) method was called already and if not add the currently needed call to the EventLoop so it will be picked up after handlerAdded(...) was called. This works as if the handler is added to the ChannelPipeline from outside the EventLoop the actual handlerAdded(...) operation is scheduled on the EventLoop.
- Some cleanup in the DefaultChannelPipeline

Result:

Correctly order of method executions of ChannelHandler.
This commit is contained in:
Norman Maurer 2016-01-15 14:23:41 +01:00
parent 52ba4f4ec7
commit ba381f1a27
2 changed files with 61 additions and 31 deletions

View File

@ -38,7 +38,22 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
private final boolean outbound; private final boolean outbound;
private final DefaultChannelPipeline pipeline; private final DefaultChannelPipeline pipeline;
private final String name; private final String name;
private boolean removed; private boolean handlerRemoved;
/**
* This is set to {@code true} once the {@link ChannelHandler#handlerAdded(ChannelHandlerContext) method is called.
* We need to keep track of this to ensure we will never call another {@link ChannelHandler} method before
* handlerAdded(...) is called to guard against ordering issues.
* {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} MUST be the first
* method that is called for handler when it becomes a part of the {@link ChannelPipeline} in all cases. Not doing
* so may lead to unexpected side-effects as {@link ChannelHandler} implementations may need to do initialization
* steps before a {@link ChannelHandler} can be used.
*
* See <a href="https://github.com/netty/netty/issues/4705">#4705</a>
*
* No need to mark volatile as this will be made visible as next/prev is volatile.
*/
private boolean handlerAdded;
// Will be set to null if no child executor should be used, otherwise it will be set to the // Will be set to null if no child executor should be used, otherwise it will be set to the
// child executor. // child executor.
@ -100,7 +115,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
public ChannelHandlerContext fireChannelRegistered() { public ChannelHandlerContext fireChannelRegistered() {
final AbstractChannelHandlerContext next = findContextInbound(); final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
next.invokeChannelRegistered(); next.invokeChannelRegistered();
} else { } else {
executor.execute(new OneTimeTask() { executor.execute(new OneTimeTask() {
@ -125,7 +140,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
public ChannelHandlerContext fireChannelUnregistered() { public ChannelHandlerContext fireChannelUnregistered() {
final AbstractChannelHandlerContext next = findContextInbound(); final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
next.invokeChannelUnregistered(); next.invokeChannelUnregistered();
} else { } else {
executor.execute(new OneTimeTask() { executor.execute(new OneTimeTask() {
@ -150,7 +165,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
public ChannelHandlerContext fireChannelActive() { public ChannelHandlerContext fireChannelActive() {
final AbstractChannelHandlerContext next = findContextInbound(); final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
next.invokeChannelActive(); next.invokeChannelActive();
} else { } else {
executor.execute(new OneTimeTask() { executor.execute(new OneTimeTask() {
@ -175,7 +190,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
public ChannelHandlerContext fireChannelInactive() { public ChannelHandlerContext fireChannelInactive() {
final AbstractChannelHandlerContext next = findContextInbound(); final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
next.invokeChannelInactive(); next.invokeChannelInactive();
} else { } else {
executor.execute(new OneTimeTask() { executor.execute(new OneTimeTask() {
@ -205,7 +220,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
final AbstractChannelHandlerContext next = this.next; final AbstractChannelHandlerContext next = this.next;
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
next.invokeExceptionCaught(cause); next.invokeExceptionCaught(cause);
} else { } else {
try { try {
@ -222,7 +237,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
} }
} }
} }
return this; return this;
} }
@ -246,7 +260,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
final AbstractChannelHandlerContext next = findContextInbound(); final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
next.invokeUserEventTriggered(event); next.invokeUserEventTriggered(event);
} else { } else {
executor.execute(new OneTimeTask() { executor.execute(new OneTimeTask() {
@ -275,7 +289,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
final AbstractChannelHandlerContext next = findContextInbound(); final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
next.invokeChannelRead(msg); next.invokeChannelRead(msg);
} else { } else {
executor.execute(new OneTimeTask() { executor.execute(new OneTimeTask() {
@ -300,7 +314,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
public ChannelHandlerContext fireChannelReadComplete() { public ChannelHandlerContext fireChannelReadComplete() {
final AbstractChannelHandlerContext next = findContextInbound(); final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
next.invokeChannelReadComplete(); next.invokeChannelReadComplete();
} else { } else {
Runnable task = next.invokeChannelReadCompleteTask; Runnable task = next.invokeChannelReadCompleteTask;
@ -329,7 +343,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
public ChannelHandlerContext fireChannelWritabilityChanged() { public ChannelHandlerContext fireChannelWritabilityChanged() {
final AbstractChannelHandlerContext next = findContextInbound(); final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
next.invokeChannelWritabilityChanged(); next.invokeChannelWritabilityChanged();
} else { } else {
Runnable task = next.invokeChannelWritableStateChangedTask; Runnable task = next.invokeChannelWritableStateChangedTask;
@ -396,7 +410,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
final AbstractChannelHandlerContext next = findContextOutbound(); final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
next.invokeBind(localAddress, promise); next.invokeBind(localAddress, promise);
} else { } else {
safeExecute(executor, new OneTimeTask() { safeExecute(executor, new OneTimeTask() {
@ -406,7 +420,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
} }
}, promise, null); }, promise, null);
} }
return promise; return promise;
} }
@ -437,7 +450,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
final AbstractChannelHandlerContext next = findContextOutbound(); final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise); next.invokeConnect(remoteAddress, localAddress, promise);
} else { } else {
safeExecute(executor, new OneTimeTask() { safeExecute(executor, new OneTimeTask() {
@ -447,7 +460,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
} }
}, promise, null); }, promise, null);
} }
return promise; return promise;
} }
@ -468,7 +480,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
final AbstractChannelHandlerContext next = findContextOutbound(); final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
// Translate disconnect to close if the channel has no notion of disconnect-reconnect. // Translate disconnect to close if the channel has no notion of disconnect-reconnect.
// So far, UDP/IP is the only transport that has such behavior. // So far, UDP/IP is the only transport that has such behavior.
if (!channel().metadata().hasDisconnect()) { if (!channel().metadata().hasDisconnect()) {
@ -488,7 +500,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
} }
}, promise, null); }, promise, null);
} }
return promise; return promise;
} }
@ -509,7 +520,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
final AbstractChannelHandlerContext next = findContextOutbound(); final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
next.invokeClose(promise); next.invokeClose(promise);
} else { } else {
safeExecute(executor, new OneTimeTask() { safeExecute(executor, new OneTimeTask() {
@ -540,7 +551,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
final AbstractChannelHandlerContext next = findContextOutbound(); final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
next.invokeDeregister(promise); next.invokeDeregister(promise);
} else { } else {
safeExecute(executor, new OneTimeTask() { safeExecute(executor, new OneTimeTask() {
@ -566,7 +577,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
public ChannelHandlerContext read() { public ChannelHandlerContext read() {
final AbstractChannelHandlerContext next = findContextOutbound(); final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
next.invokeRead(); next.invokeRead();
} else { } else {
Runnable task = next.invokeReadTask; Runnable task = next.invokeReadTask;
@ -625,7 +636,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
public ChannelHandlerContext flush() { public ChannelHandlerContext flush() {
final AbstractChannelHandlerContext next = findContextOutbound(); final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
next.invokeFlush(); next.invokeFlush();
} else { } else {
Runnable task = next.invokeFlushTask; Runnable task = next.invokeFlushTask;
@ -671,7 +682,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
private void write(Object msg, boolean flush, ChannelPromise promise) { private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound(); AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (next.isHandlerAddedCalled() && executor.inEventLoop()) {
next.invokeWrite(msg, promise); next.invokeWrite(msg, promise);
if (flush) { if (flush) {
next.invokeFlush(); next.invokeFlush();
@ -816,12 +827,20 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
} }
void setRemoved() { void setRemoved() {
removed = true; handlerRemoved = true;
} }
@Override @Override
public boolean isRemoved() { public boolean isRemoved() {
return removed; return handlerRemoved;
}
final void setHandlerAddedCalled() {
handlerAdded = true;
}
final boolean isHandlerAddedCalled() {
return handlerAdded;
} }
private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) { private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {

View File

@ -68,7 +68,9 @@ final class DefaultChannelPipeline implements ChannelPipeline {
this.channel = channel; this.channel = channel;
tail = new TailContext(this); tail = new TailContext(this);
tail.setHandlerAddedCalled();
head = new HeadContext(this); head = new HeadContext(this);
head.setHandlerAddedCalled();
head.next = tail; head.next = tail;
tail.prev = head; tail.prev = head;
@ -309,7 +311,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
Future<?> future; Future<?> future;
synchronized (this) { synchronized (this) {
if (!ctx.channel().isRegistered() || ctx.executor().inEventLoop()) { if (!isExecuteLater(ctx)) {
remove0(ctx); remove0(ctx);
return ctx; return ctx;
} else { } else {
@ -390,7 +392,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
final AbstractChannelHandlerContext newCtx = newContext(ctx.executor, newName, newHandler); final AbstractChannelHandlerContext newCtx = newContext(ctx.executor, newName, newHandler);
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { if (!isExecuteLater(newCtx)) {
replace0(ctx, newCtx); replace0(ctx, newCtx);
return ctx.handler(); return ctx.handler();
} else { } else {
@ -452,8 +454,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
} }
private void callHandlerAdded(final ChannelHandlerContext ctx) { private void callHandlerAdded(final AbstractChannelHandlerContext ctx) {
if (ctx.channel().isRegistered() && !ctx.executor().inEventLoop()) { if (isExecuteLater(ctx)) {
ctx.executor().execute(new OneTimeTask() { ctx.executor().execute(new OneTimeTask() {
@Override @Override
public void run() { public void run() {
@ -465,9 +467,14 @@ final class DefaultChannelPipeline implements ChannelPipeline {
callHandlerAdded0(ctx); callHandlerAdded0(ctx);
} }
private void callHandlerAdded0(final ChannelHandlerContext ctx) { private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
try { try {
ctx.handler().handlerAdded(ctx); ctx.handler().handlerAdded(ctx);
} finally {
// handlerAdded(...) method was called.
ctx.setHandlerAddedCalled();
}
} catch (Throwable t) { } catch (Throwable t) {
boolean removed = false; boolean removed = false;
try { try {
@ -492,7 +499,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
private void callHandlerRemoved(final AbstractChannelHandlerContext ctx) { private void callHandlerRemoved(final AbstractChannelHandlerContext ctx) {
if (ctx.channel().isRegistered() && !ctx.executor().inEventLoop()) { if (isExecuteLater(ctx)) {
ctx.executor().execute(new OneTimeTask() { ctx.executor().execute(new OneTimeTask() {
@Override @Override
public void run() { public void run() {
@ -515,6 +522,10 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
} }
private static boolean isExecuteLater(ChannelHandlerContext ctx) {
return ctx.channel().isRegistered() && !ctx.executor().inEventLoop();
}
/** /**
* Waits for a future to finish. If the task is interrupted, then the current thread will be interrupted. * Waits for a future to finish. If the task is interrupted, then the current thread will be interrupted.
* It is expected that the task performs any appropriate locking. * It is expected that the task performs any appropriate locking.