From 10d790901396ba37be9c21253e57c88846d6f124 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 7 Dec 2018 19:12:06 +0100 Subject: [PATCH] More correct fix for using ChannelInitializer with custom EventExecutor. (#8633) Motivation: 8331248671b9c0ea07cf8dbdfa5d8d2f89fdf459 did make some changes to fix a race in ChannelInitializer when using with a custom EventExecutor. Unfortunally these where a bit racy and so the testcase failed sometimes. Modifications: - More correct fix when using a custom EventExecutor - Adjust the testcase to be more correct. Result: Proper fix for https://github.com/netty/netty/issues/8616. --- .../io/netty/channel/ChannelInitializer.java | 46 ++++++++++--------- .../netty/channel/ChannelInitializerTest.java | 22 ++++++--- 2 files changed, 40 insertions(+), 28 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/ChannelInitializer.java b/transport/src/main/java/io/netty/channel/ChannelInitializer.java index 18344d200f..9aa4eaa570 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInitializer.java +++ b/transport/src/main/java/io/netty/channel/ChannelInitializer.java @@ -79,6 +79,9 @@ public abstract class ChannelInitializer extends ChannelInbou // we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not // miss an event. ctx.pipeline().fireChannelRegistered(); + + // We are done with init the Channel, removing all the state for the Channel now. + removeState(ctx); } else { // Called initChannel(...) before which is the expected behavior, so just forward the event. ctx.fireChannelRegistered(); @@ -106,7 +109,11 @@ public abstract class ChannelInitializer extends ChannelInbou // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers // will be added in the expected order. - initChannel(ctx); + if (initChannel(ctx)) { + + // We are done with init the Channel, removing the initializer now. + removeState(ctx); + } } } @@ -125,32 +132,29 @@ public abstract class ChannelInitializer extends ChannelInbou // We do so to prevent multiple calls to initChannel(...). exceptionCaught(ctx, cause); } finally { - remove(ctx); + ChannelPipeline pipeline = ctx.pipeline(); + if (pipeline.context(this) != null) { + pipeline.remove(this); + } } return true; } return false; } - private void remove(final ChannelHandlerContext ctx) { - try { - ChannelPipeline pipeline = ctx.pipeline(); - if (pipeline.context(this) != null) { - pipeline.remove(this); - } - } finally { - // The removal may happen in an async fashion if the EventExecutor we use does something funky. - if (ctx.isRemoved()) { - initMap.remove(ctx); - } else { - // Ensure we always remove from the Map in all cases to not produce a memory leak. - ctx.channel().closeFuture().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) { - initMap.remove(ctx); - } - }); - } + private void removeState(final ChannelHandlerContext ctx) { + // The removal may happen in an async fashion if the EventExecutor we use does something funky. + if (ctx.isRemoved()) { + initMap.remove(ctx); + } else { + // The context is not removed yet which is most likely the case because a custom EventExecutor is used. + // Let's schedule it on the EventExecutor to give it some more time to be completed in case it is offloaded. + ctx.executor().execute(new Runnable() { + @Override + public void run() { + initMap.remove(ctx); + } + }); } } } diff --git a/transport/src/test/java/io/netty/channel/ChannelInitializerTest.java b/transport/src/test/java/io/netty/channel/ChannelInitializerTest.java index 2ac1bcdefa..bebf2d5e58 100644 --- a/transport/src/test/java/io/netty/channel/ChannelInitializerTest.java +++ b/transport/src/test/java/io/netty/channel/ChannelInitializerTest.java @@ -29,6 +29,7 @@ import org.junit.Test; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -312,6 +313,7 @@ public class ChannelInitializerTest { } }; + final CountDownLatch latch = new CountDownLatch(1); ServerBootstrap serverBootstrap = new ServerBootstrap() .channel(LocalServerChannel.class) .group(group) @@ -331,13 +333,20 @@ public class ChannelInitializerTest { public void channelRead(ChannelHandlerContext ctx, Object msg) { // just drop on the floor. } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) { + latch.countDown(); + } }); completeCount.incrementAndGet(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - errorRef.set(cause); + if (cause instanceof AssertionError) { + errorRef.set(cause); + } } }); } @@ -360,19 +369,18 @@ public class ChannelInitializerTest { client.closeFuture().sync(); server.closeFuture().sync(); - // Give some time to execute everything that was submitted before. - Thread.sleep(1000); + latch.await(); - executor.shutdown(); - assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS)); - - assertEquals(invokeCount.get(), 1); + assertEquals(1, invokeCount.get()); assertEquals(invokeCount.get(), completeCount.get()); Throwable cause = errorRef.get(); if (cause != null) { throw cause; } + + executor.shutdown(); + assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS)); } private static void closeChannel(Channel c) {