Fix small race in DefaultChannelPipeline introduced by a729e0fcd94009905d219665bdd069eb31433b7c

Motivation:

There is a small race while adding handlers to the pipeline because callHandlerAddedForAllHandlers() may not be run when the user calls add* but the Channel is already registered.

Modifications:

Ensure we always delay handlerAdded(..) / handlerRemoved(...) until callHandlerAddedForAllHandlers() was called.

Result:

No more race on pipeline modifications possible.
This commit is contained in:
Norman Maurer 2016-05-30 11:02:41 +02:00
parent 5ec15b3124
commit fd1fa42a7d
2 changed files with 30 additions and 40 deletions

View File

@ -132,24 +132,23 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) { public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx; final AbstractChannelHandlerContext newCtx;
final EventExecutor executor;
synchronized (this) { synchronized (this) {
checkMultiplicity(handler); checkMultiplicity(handler);
name = filterName(name, handler); name = filterName(name, handler);
newCtx = newContext(group, name, handler); newCtx = newContext(group, name, handler);
executor = executorSafe(newCtx.executor);
addFirst0(newCtx); addFirst0(newCtx);
// If the executor is null it means that the channel was not registered on an eventloop yet. // If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call // In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered. // ChannelHandler.handlerAdded(...) once the channel is registered.
if (executor == null) { if (!registered) {
callHandlerCallbackLater(newCtx, true); callHandlerCallbackLater(newCtx, true);
return this; return this;
} }
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) { if (!executor.inEventLoop()) {
executor.execute(new OneTimeTask() { executor.execute(new OneTimeTask() {
@Override @Override
@ -179,23 +178,23 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final EventExecutor executor;
final AbstractChannelHandlerContext newCtx; final AbstractChannelHandlerContext newCtx;
synchronized (this) { synchronized (this) {
checkMultiplicity(handler); checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler); newCtx = newContext(group, filterName(name, handler), handler);
executor = executorSafe(newCtx.executor);
addLast0(newCtx); addLast0(newCtx);
// If the executor is null it means that the channel was not registered on an eventloop yet. // If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call // In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered. // ChannelHandler.handlerAdded(...) once the channel is registered.
if (executor == null) { if (!registered) {
callHandlerCallbackLater(newCtx, true); callHandlerCallbackLater(newCtx, true);
return this; return this;
} }
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) { if (!executor.inEventLoop()) {
executor.execute(new OneTimeTask() { executor.execute(new OneTimeTask() {
@Override @Override
@ -226,7 +225,6 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public final ChannelPipeline addBefore( public final ChannelPipeline addBefore(
EventExecutorGroup group, String baseName, String name, ChannelHandler handler) { EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
final EventExecutor executor;
final AbstractChannelHandlerContext newCtx; final AbstractChannelHandlerContext newCtx;
final AbstractChannelHandlerContext ctx; final AbstractChannelHandlerContext ctx;
synchronized (this) { synchronized (this) {
@ -235,18 +233,18 @@ public class DefaultChannelPipeline implements ChannelPipeline {
ctx = getContextOrDie(baseName); ctx = getContextOrDie(baseName);
newCtx = newContext(group, name, handler); newCtx = newContext(group, name, handler);
executor = executorSafe(newCtx.executor);
addBefore0(ctx, newCtx); addBefore0(ctx, newCtx);
// If the executor is null it means that the channel was not registered on an eventloop yet. // If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call // In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered. // ChannelHandler.handlerAdded(...) once the channel is registered.
if (executor == null) { if (!registered) {
callHandlerCallbackLater(newCtx, true); callHandlerCallbackLater(newCtx, true);
return this; return this;
} }
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) { if (!executor.inEventLoop()) {
executor.execute(new OneTimeTask() { executor.execute(new OneTimeTask() {
@Override @Override
@ -284,7 +282,6 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public final ChannelPipeline addAfter( public final ChannelPipeline addAfter(
EventExecutorGroup group, String baseName, String name, ChannelHandler handler) { EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
final EventExecutor executor;
final AbstractChannelHandlerContext newCtx; final AbstractChannelHandlerContext newCtx;
final AbstractChannelHandlerContext ctx; final AbstractChannelHandlerContext ctx;
@ -294,17 +291,17 @@ public class DefaultChannelPipeline implements ChannelPipeline {
ctx = getContextOrDie(baseName); ctx = getContextOrDie(baseName);
newCtx = newContext(group, name, handler); newCtx = newContext(group, name, handler);
executor = executorSafe(newCtx.executor);
addAfter0(ctx, newCtx); addAfter0(ctx, newCtx);
// If the executor is null it means that the channel was not registered on an eventloop yet. // If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we remove the context from the pipeline and add a task that will call // In this case we remove the context from the pipeline and add a task that will call
// ChannelHandler.handlerRemoved(...) once the channel is registered. // ChannelHandler.handlerRemoved(...) once the channel is registered.
if (executor == null) { if (!registered) {
callHandlerCallbackLater(newCtx, true); callHandlerCallbackLater(newCtx, true);
return this; return this;
} }
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) { if (!executor.inEventLoop()) {
executor.execute(new OneTimeTask() { executor.execute(new OneTimeTask() {
@Override @Override
@ -424,20 +421,18 @@ public class DefaultChannelPipeline implements ChannelPipeline {
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) { private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
assert ctx != head && ctx != tail; assert ctx != head && ctx != tail;
final EventExecutor executor;
synchronized (this) { synchronized (this) {
executor = executorSafe(ctx.executor);
remove0(ctx); remove0(ctx);
// If the executor is null it means that the channel was not registered on an eventloop yet. // If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we remove the context from the pipeline and add a task that will call // In this case we remove the context from the pipeline and add a task that will call
// ChannelHandler.handlerRemoved(...) once the channel is registered. // ChannelHandler.handlerRemoved(...) once the channel is registered.
if (executor == null) { if (!registered) {
callHandlerCallbackLater(ctx, false); callHandlerCallbackLater(ctx, false);
return ctx; return ctx;
} }
EventExecutor executor = ctx.executor();
if (!executor.inEventLoop()) { if (!executor.inEventLoop()) {
executor.execute(new OneTimeTask() { executor.execute(new OneTimeTask() {
@Override @Override
@ -498,7 +493,6 @@ public class DefaultChannelPipeline implements ChannelPipeline {
assert ctx != head && ctx != tail; assert ctx != head && ctx != tail;
final AbstractChannelHandlerContext newCtx; final AbstractChannelHandlerContext newCtx;
final EventExecutor executor;
synchronized (this) { synchronized (this) {
checkMultiplicity(newHandler); checkMultiplicity(newHandler);
boolean sameName = ctx.name().equals(newName); boolean sameName = ctx.name().equals(newName);
@ -507,19 +501,19 @@ public class DefaultChannelPipeline implements ChannelPipeline {
} }
newCtx = newContext(ctx.executor, newName, newHandler); newCtx = newContext(ctx.executor, newName, newHandler);
executor = executorSafe(ctx.executor);
replace0(ctx, newCtx); replace0(ctx, newCtx);
// If the executor is null it means that the channel was not registered on an eventloop yet. // If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we replace the context in the pipeline // In this case we replace the context in the pipeline
// and add a task that will call ChannelHandler.handlerAdded(...) and // and add a task that will call ChannelHandler.handlerAdded(...) and
// ChannelHandler.handlerRemoved(...) once the channel is registered. // ChannelHandler.handlerRemoved(...) once the channel is registered.
if (executor == null) { if (!registered) {
callHandlerCallbackLater(newCtx, true); callHandlerCallbackLater(newCtx, true);
callHandlerCallbackLater(ctx, false); callHandlerCallbackLater(ctx, false);
return ctx.handler(); return ctx.handler();
} }
EventExecutor executor = ctx.executor();
if (!executor.inEventLoop()) { if (!executor.inEventLoop()) {
executor.execute(new OneTimeTask() { executor.execute(new OneTimeTask() {
@Override @Override
@ -1085,16 +1079,6 @@ public class DefaultChannelPipeline implements ChannelPipeline {
} }
} }
private EventExecutor executorSafe(EventExecutor eventExecutor) {
if (eventExecutor == null) {
// We check for channel().isRegistered and handlerAdded because even if isRegistered() is false we
// can safely access the eventLoop() if handlerAdded is true. This is because in this case the Channel
// was previously registered and so we can still access the old EventLoop to dispatch things.
return channel.isRegistered() || registered ? channel.eventLoop() : null;
}
return eventExecutor;
}
/** /**
* Called once a {@link Throwable} hit the end of the {@link ChannelPipeline} without been handled by the user * Called once a {@link Throwable} hit the end of the {@link ChannelPipeline} without been handled by the user
* in {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}. * in {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}.

View File

@ -716,14 +716,14 @@ public class DefaultChannelPipelineTest {
final AtomicBoolean handlerAdded = new AtomicBoolean(); final AtomicBoolean handlerAdded = new AtomicBoolean();
final Exception exception = new RuntimeException(); final Exception exception = new RuntimeException();
ChannelPipeline pipeline = new LocalChannel().pipeline(); ChannelPipeline pipeline = new LocalChannel().pipeline();
pipeline.addLast(new ChannelHandlerAdapter() { pipeline.addLast(group1, new CheckExceptionHandler(exception, promise));
pipeline.addFirst(new ChannelHandlerAdapter() {
@Override @Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
handlerAdded.set(true); handlerAdded.set(true);
throw exception; throw exception;
} }
}); });
pipeline.addLast(group1, new CheckExceptionHandler(exception, promise));
assertFalse(handlerAdded.get()); assertFalse(handlerAdded.get());
group.register(pipeline.channel()); group.register(pipeline.channel());
promise.syncUninterruptibly(); promise.syncUninterruptibly();
@ -765,7 +765,8 @@ public class DefaultChannelPipelineTest {
final Exception exceptionRemoved = new RuntimeException(); final Exception exceptionRemoved = new RuntimeException();
String handlerName = "foo"; String handlerName = "foo";
ChannelPipeline pipeline = new LocalChannel().pipeline(); ChannelPipeline pipeline = new LocalChannel().pipeline();
pipeline.addLast(handlerName, new ChannelHandlerAdapter() { pipeline.addLast(group1, new CheckExceptionHandler(exceptionAdded, promise));
pipeline.addFirst(handlerName, new ChannelHandlerAdapter() {
@Override @Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
throw exceptionAdded; throw exceptionAdded;
@ -773,11 +774,16 @@ public class DefaultChannelPipelineTest {
@Override @Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
latch.countDown(); // Execute this later so we are sure the exception is handled first.
ctx.executor().execute(new Runnable() {
@Override
public void run() {
latch.countDown();
}
});
throw exceptionRemoved; throw exceptionRemoved;
} }
}); });
pipeline.addLast(group1, new CheckExceptionHandler(exceptionAdded, promise));
group.register(pipeline.channel()).syncUninterruptibly(); group.register(pipeline.channel()).syncUninterruptibly();
latch.await(); latch.await();
assertNull(pipeline.context(handlerName)); assertNull(pipeline.context(handlerName));