More correct fix for using ChannelInitializer with custom EventExecutor. (#8633)

Motivation:

8331248671 did make some changes to fix a race in ChannelInitializer when using with a custom EventExecutor. Unfortunally these where a bit racy and so the testcase failed sometimes.

Modifications:

- More correct fix when using a custom EventExecutor
- Adjust the testcase to be more correct.

Result:

Proper fix for https://github.com/netty/netty/issues/8616.
This commit is contained in:
Norman Maurer 2018-12-07 19:12:06 +01:00 committed by GitHub
parent 22b2c4c3b8
commit a564b70d51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 28 deletions

View File

@ -79,6 +79,9 @@ public abstract class ChannelInitializer<C extends Channel> extends ChannelInbou
// we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
// miss an event.
ctx.pipeline().fireChannelRegistered();
// We are done with init the Channel, removing all the state for the Channel now.
removeState(ctx);
} else {
// Called initChannel(...) before which is the expected behavior, so just forward the event.
ctx.fireChannelRegistered();
@ -106,7 +109,11 @@ public abstract class ChannelInitializer<C extends Channel> extends ChannelInbou
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order.
initChannel(ctx);
if (initChannel(ctx)) {
// We are done with init the Channel, removing the initializer now.
removeState(ctx);
}
}
}
@ -125,32 +132,29 @@ public abstract class ChannelInitializer<C extends Channel> extends ChannelInbou
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
remove(ctx);
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
}
return true;
}
return false;
}
private void remove(final ChannelHandlerContext ctx) {
try {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
} finally {
// The removal may happen in an async fashion if the EventExecutor we use does something funky.
if (ctx.isRemoved()) {
initMap.remove(ctx);
} else {
// Ensure we always remove from the Map in all cases to not produce a memory leak.
ctx.channel().closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
initMap.remove(ctx);
}
});
}
private void removeState(final ChannelHandlerContext ctx) {
// The removal may happen in an async fashion if the EventExecutor we use does something funky.
if (ctx.isRemoved()) {
initMap.remove(ctx);
} else {
// The context is not removed yet which is most likely the case because a custom EventExecutor is used.
// Let's schedule it on the EventExecutor to give it some more time to be completed in case it is offloaded.
ctx.executor().execute(new Runnable() {
@Override
public void run() {
initMap.remove(ctx);
}
});
}
}
}

View File

@ -29,6 +29,7 @@ import org.junit.Test;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -312,6 +313,7 @@ public class ChannelInitializerTest {
}
};
final CountDownLatch latch = new CountDownLatch(1);
ServerBootstrap serverBootstrap = new ServerBootstrap()
.channel(LocalServerChannel.class)
.group(group)
@ -331,13 +333,20 @@ public class ChannelInitializerTest {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// just drop on the floor.
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) {
latch.countDown();
}
});
completeCount.incrementAndGet();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
errorRef.set(cause);
if (cause instanceof AssertionError) {
errorRef.set(cause);
}
}
});
}
@ -360,19 +369,18 @@ public class ChannelInitializerTest {
client.closeFuture().sync();
server.closeFuture().sync();
// Give some time to execute everything that was submitted before.
Thread.sleep(1000);
latch.await();
executor.shutdown();
assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
assertEquals(invokeCount.get(), 1);
assertEquals(1, invokeCount.get());
assertEquals(invokeCount.get(), completeCount.get());
Throwable cause = errorRef.get();
if (cause != null) {
throw cause;
}
executor.shutdown();
assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
}
private static void closeChannel(Channel c) {