Fix small race in DefaultChannelPipeline introduced by a729e0fcd94009905d219665bdd069eb31433b7c

Motivation:

There is a small race while adding handlers to the pipeline because callHandlerAddedForAllHandlers() may not be run when the user calls add* but the Channel is already registered.

Modifications:

Ensure we always delay handlerAdded(..) / handlerRemoved(...) until callHandlerAddedForAllHandlers() was called.

Result:

No more race on pipeline modifications possible.
This commit is contained in:
Norman Maurer 2016-05-30 11:02:41 +02:00
parent dcd93e3be0
commit 339b512e70
2 changed files with 30 additions and 40 deletions

View File

@ -143,24 +143,23 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
final EventExecutor executor;
synchronized (this) {
checkMultiplicity(handler);
name = filterName(name, handler);
newCtx = newContext(group, name, handler);
executor = executorSafe(newCtx.executor);
addFirst0(newCtx);
// If the executor is null it means that the channel was not registered on an eventloop yet.
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (executor == null) {
if (!registered) {
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
executor.execute(new OneTimeTask() {
@Override
@ -190,23 +189,23 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final EventExecutor executor;
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
executor = executorSafe(newCtx.executor);
addLast0(newCtx);
// If the executor is null it means that the channel was not registered on an eventloop yet.
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (executor == null) {
if (!registered) {
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
executor.execute(new OneTimeTask() {
@Override
@ -237,7 +236,6 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelPipeline addBefore(
EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
final EventExecutor executor;
final AbstractChannelHandlerContext newCtx;
final AbstractChannelHandlerContext ctx;
synchronized (this) {
@ -246,18 +244,18 @@ public class DefaultChannelPipeline implements ChannelPipeline {
ctx = getContextOrDie(baseName);
newCtx = newContext(group, name, handler);
executor = executorSafe(newCtx.executor);
addBefore0(ctx, newCtx);
// If the executor is null it means that the channel was not registered on an eventloop yet.
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (executor == null) {
if (!registered) {
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
executor.execute(new OneTimeTask() {
@Override
@ -295,7 +293,6 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelPipeline addAfter(
EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
final EventExecutor executor;
final AbstractChannelHandlerContext newCtx;
final AbstractChannelHandlerContext ctx;
@ -305,17 +302,17 @@ public class DefaultChannelPipeline implements ChannelPipeline {
ctx = getContextOrDie(baseName);
newCtx = newContext(group, name, handler);
executor = executorSafe(newCtx.executor);
addAfter0(ctx, newCtx);
// If the executor is null it means that the channel was not registered on an eventloop yet.
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we remove the context from the pipeline and add a task that will call
// ChannelHandler.handlerRemoved(...) once the channel is registered.
if (executor == null) {
if (!registered) {
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
executor.execute(new OneTimeTask() {
@Override
@ -435,20 +432,18 @@ public class DefaultChannelPipeline implements ChannelPipeline {
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
assert ctx != head && ctx != tail;
final EventExecutor executor;
synchronized (this) {
executor = executorSafe(ctx.executor);
remove0(ctx);
// If the executor is null it means that the channel was not registered on an eventloop yet.
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we remove the context from the pipeline and add a task that will call
// ChannelHandler.handlerRemoved(...) once the channel is registered.
if (executor == null) {
if (!registered) {
callHandlerCallbackLater(ctx, false);
return ctx;
}
EventExecutor executor = ctx.executor();
if (!executor.inEventLoop()) {
executor.execute(new OneTimeTask() {
@Override
@ -509,7 +504,6 @@ public class DefaultChannelPipeline implements ChannelPipeline {
assert ctx != head && ctx != tail;
final AbstractChannelHandlerContext newCtx;
final EventExecutor executor;
synchronized (this) {
checkMultiplicity(newHandler);
if (newName == null) {
@ -522,19 +516,19 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
newCtx = newContext(ctx.executor, newName, newHandler);
executor = executorSafe(ctx.executor);
replace0(ctx, newCtx);
// If the executor is null it means that the channel was not registered on an eventloop yet.
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we replace the context in the pipeline
// and add a task that will call ChannelHandler.handlerAdded(...) and
// ChannelHandler.handlerRemoved(...) once the channel is registered.
if (executor == null) {
if (!registered) {
callHandlerCallbackLater(newCtx, true);
callHandlerCallbackLater(ctx, false);
return ctx.handler();
}
EventExecutor executor = ctx.executor();
if (!executor.inEventLoop()) {
executor.execute(new OneTimeTask() {
@Override
@ -1125,16 +1119,6 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
}
private EventExecutor executorSafe(EventExecutor eventExecutor) {
if (eventExecutor == null) {
// We check for channel().isRegistered and handlerAdded because even if isRegistered() is false we
// can safely access the eventLoop() if handlerAdded is true. This is because in this case the Channel
// was previously registered and so we can still access the old EventLoop to dispatch things.
return channel.isRegistered() || registered ? channel.eventLoop() : null;
}
return eventExecutor;
}
/**
* Called once a {@link Throwable} hit the end of the {@link ChannelPipeline} without been handled by the user
* in {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}.

View File

@ -729,14 +729,14 @@ public class DefaultChannelPipelineTest {
final AtomicBoolean handlerAdded = new AtomicBoolean();
final Exception exception = new RuntimeException();
ChannelPipeline pipeline = new LocalChannel().pipeline();
pipeline.addLast(new ChannelHandlerAdapter() {
pipeline.addLast(group1, new CheckExceptionHandler(exception, promise));
pipeline.addFirst(new ChannelHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
handlerAdded.set(true);
throw exception;
}
});
pipeline.addLast(group1, new CheckExceptionHandler(exception, promise));
assertFalse(handlerAdded.get());
group.register(pipeline.channel());
promise.syncUninterruptibly();
@ -778,7 +778,8 @@ public class DefaultChannelPipelineTest {
final Exception exceptionRemoved = new RuntimeException();
String handlerName = "foo";
ChannelPipeline pipeline = new LocalChannel().pipeline();
pipeline.addLast(handlerName, new ChannelHandlerAdapter() {
pipeline.addLast(group1, new CheckExceptionHandler(exceptionAdded, promise));
pipeline.addFirst(handlerName, new ChannelHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
throw exceptionAdded;
@ -786,11 +787,16 @@ public class DefaultChannelPipelineTest {
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
latch.countDown();
// Execute this later so we are sure the exception is handled first.
ctx.executor().execute(new Runnable() {
@Override
public void run() {
latch.countDown();
}
});
throw exceptionRemoved;
}
});
pipeline.addLast(group1, new CheckExceptionHandler(exceptionAdded, promise));
group.register(pipeline.channel()).syncUninterruptibly();
latch.await();
assertNull(pipeline.context(handlerName));