More correct fix for using ChannelInitializer with custom EventExecutor. (#8633)

Motivation:

8331248671 did make some changes to fix a race in ChannelInitializer when using with a custom EventExecutor. Unfortunally these where a bit racy and so the testcase failed sometimes.

Modifications:

- More correct fix when using a custom EventExecutor
- Adjust the testcase to be more correct.

Result:

Proper fix for https://github.com/netty/netty/issues/8616.
This commit is contained in:
Norman Maurer 2018-12-07 19:12:06 +01:00
parent 0288570b06
commit 10d7909013
2 changed files with 40 additions and 28 deletions

View File

@ -79,6 +79,9 @@ public abstract class ChannelInitializer<C extends Channel> extends ChannelInbou
// we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not // we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
// miss an event. // miss an event.
ctx.pipeline().fireChannelRegistered(); ctx.pipeline().fireChannelRegistered();
// We are done with init the Channel, removing all the state for the Channel now.
removeState(ctx);
} else { } else {
// Called initChannel(...) before which is the expected behavior, so just forward the event. // Called initChannel(...) before which is the expected behavior, so just forward the event.
ctx.fireChannelRegistered(); ctx.fireChannelRegistered();
@ -106,7 +109,11 @@ public abstract class ChannelInitializer<C extends Channel> extends ChannelInbou
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order. // will be added in the expected order.
initChannel(ctx); if (initChannel(ctx)) {
// We are done with init the Channel, removing the initializer now.
removeState(ctx);
}
} }
} }
@ -125,32 +132,29 @@ public abstract class ChannelInitializer<C extends Channel> extends ChannelInbou
// We do so to prevent multiple calls to initChannel(...). // We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause); exceptionCaught(ctx, cause);
} finally { } finally {
remove(ctx); ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
} }
return true; return true;
} }
return false; return false;
} }
private void remove(final ChannelHandlerContext ctx) { private void removeState(final ChannelHandlerContext ctx) {
try { // The removal may happen in an async fashion if the EventExecutor we use does something funky.
ChannelPipeline pipeline = ctx.pipeline(); if (ctx.isRemoved()) {
if (pipeline.context(this) != null) { initMap.remove(ctx);
pipeline.remove(this); } else {
} // The context is not removed yet which is most likely the case because a custom EventExecutor is used.
} finally { // Let's schedule it on the EventExecutor to give it some more time to be completed in case it is offloaded.
// The removal may happen in an async fashion if the EventExecutor we use does something funky. ctx.executor().execute(new Runnable() {
if (ctx.isRemoved()) { @Override
initMap.remove(ctx); public void run() {
} else { initMap.remove(ctx);
// Ensure we always remove from the Map in all cases to not produce a memory leak. }
ctx.channel().closeFuture().addListener(new ChannelFutureListener() { });
@Override
public void operationComplete(ChannelFuture future) {
initMap.remove(ctx);
}
});
}
} }
} }
} }

View File

@ -29,6 +29,7 @@ import org.junit.Test;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -312,6 +313,7 @@ public class ChannelInitializerTest {
} }
}; };
final CountDownLatch latch = new CountDownLatch(1);
ServerBootstrap serverBootstrap = new ServerBootstrap() ServerBootstrap serverBootstrap = new ServerBootstrap()
.channel(LocalServerChannel.class) .channel(LocalServerChannel.class)
.group(group) .group(group)
@ -331,13 +333,20 @@ public class ChannelInitializerTest {
public void channelRead(ChannelHandlerContext ctx, Object msg) { public void channelRead(ChannelHandlerContext ctx, Object msg) {
// just drop on the floor. // just drop on the floor.
} }
@Override
public void channelUnregistered(ChannelHandlerContext ctx) {
latch.countDown();
}
}); });
completeCount.incrementAndGet(); completeCount.incrementAndGet();
} }
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
errorRef.set(cause); if (cause instanceof AssertionError) {
errorRef.set(cause);
}
} }
}); });
} }
@ -360,19 +369,18 @@ public class ChannelInitializerTest {
client.closeFuture().sync(); client.closeFuture().sync();
server.closeFuture().sync(); server.closeFuture().sync();
// Give some time to execute everything that was submitted before. latch.await();
Thread.sleep(1000);
executor.shutdown(); assertEquals(1, invokeCount.get());
assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
assertEquals(invokeCount.get(), 1);
assertEquals(invokeCount.get(), completeCount.get()); assertEquals(invokeCount.get(), completeCount.get());
Throwable cause = errorRef.get(); Throwable cause = errorRef.get();
if (cause != null) { if (cause != null) {
throw cause; throw cause;
} }
executor.shutdown();
assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
} }
private static void closeChannel(Channel c) { private static void closeChannel(Channel c) {