From cd688c5cde6a13fb4bbfc5f5616c1480b9c6750e Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 3 Jul 2014 21:24:15 +0200 Subject: [PATCH] [#2586] Use correct EventLoop to notify delayed bind failures Motivation: When a bind fails AbstractBootstrap will use the GlobalEventExecutor to notify the ChannelPromise. We should use the EventLoop of the Channel if possible. Modification: Use EventLoop of the Channel if possible to use the correct Thread to notify and so guaranteer the right order of events. Result: Use the correct EventLoop for notification --- .../io/netty/bootstrap/AbstractBootstrap.java | 7 +++- .../io/netty/bootstrap/BootstrapTest.java | 37 ++++++++++++++----- 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java b/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java index 557e46f8ef..411a86c0c8 100644 --- a/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java @@ -300,7 +300,8 @@ public abstract class AbstractBootstrap, C ext init(channel); } catch (Throwable t) { channel.unsafe().closeForcibly(); - return channel.newFailedFuture(t); + // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor + return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = group().register(channel); @@ -460,9 +461,11 @@ public abstract class AbstractBootstrap, C ext @Override protected EventExecutor executor() { - if (isSuccess()) { + if (channel().isRegistered()) { // If the registration was a success we can just call super.executor() which will return // channel.eventLoop(). + // + // See https://github.com/netty/netty/issues/2586 return super.executor(); } // The registration failed so we can only use the GlobalEventExecutor as last resort to notify. diff --git a/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java b/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java index f9e09a472c..2686d634b2 100644 --- a/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java +++ b/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java @@ -24,20 +24,20 @@ import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.local.LocalServerChannel; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GlobalEventExecutor; import org.junit.Assert; import org.junit.Test; +import java.net.SocketAddress; +import java.net.SocketException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; public class BootstrapTest { @@ -155,9 +155,11 @@ public class BootstrapTest { @Override public void operationComplete(ChannelFuture future) throws Exception { queue.add(future.channel().eventLoop().inEventLoop(Thread.currentThread())); + queue.add(future.isSuccess()); } }); Assert.assertTrue(queue.take()); + Assert.assertTrue(queue.take()); } finally { group.shutdownGracefully(); group.terminationFuture().sync(); @@ -165,24 +167,41 @@ public class BootstrapTest { } @Test - public void testLateRegisterFailed() throws Exception { - final TestLocalEventLoopGroup group = new TestLocalEventLoopGroup(); + public void testLateRegisterSuccessBindFailed() throws Exception { + TestLocalEventLoopGroup group = new TestLocalEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(group); - bootstrap.channel(LocalServerChannel.class); + bootstrap.channelFactory(new ChannelFactory() { + @Override + public ServerChannel newChannel() { + return new LocalServerChannel() { + @Override + public ChannelFuture bind(SocketAddress localAddress) { + return newFailedFuture(new SocketException()); + } + + @Override + public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { + return promise.setFailure(new SocketException()); + } + }; + } + }); bootstrap.childHandler(new DummyHandler()); bootstrap.localAddress(new LocalAddress("1")); ChannelFuture future = bootstrap.bind(); Assert.assertFalse(future.isDone()); - group.promise.setFailure(new IllegalStateException()); + group.promise.setSuccess(); final BlockingQueue queue = new LinkedBlockingQueue(); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - queue.add(group.next().inEventLoop(Thread.currentThread())); + queue.add(future.channel().eventLoop().inEventLoop(Thread.currentThread())); + queue.add(future.isSuccess()); } }); + Assert.assertTrue(queue.take()); Assert.assertFalse(queue.take()); } finally { group.shutdownGracefully(); @@ -204,7 +223,7 @@ public class BootstrapTest { } @Override - public ChannelFuture register(Channel channel, ChannelPromise promise) { + public ChannelFuture register(Channel channel, final ChannelPromise promise) { throw new UnsupportedOperationException(); } }