[#2586] Use correct EventLoop to notify delayed bind failures

Motivation:

When a bind fails AbstractBootstrap will use the GlobalEventExecutor to notify the ChannelPromise. We should use the EventLoop of the Channel if possible.

Modification:

Use EventLoop of the Channel if possible to use the correct Thread to notify and so guaranteer the right order of events.

Result:

Use the correct EventLoop for notification
This commit is contained in:
Norman Maurer 2014-07-03 21:24:15 +02:00
parent 2438ec5e24
commit 9c4a733471
2 changed files with 33 additions and 8 deletions

View File

@ -300,7 +300,8 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
return channel.newFailedFuture(t);
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = group().register(channel);
@ -460,9 +461,11 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
@Override
protected EventExecutor executor() {
if (isSuccess()) {
if (channel().isRegistered()) {
// If the registration was a success we can just call super.executor() which will return
// channel.eventLoop().
//
// See https://github.com/netty/netty/issues/2586
return super.executor();
}
// The registration failed so we can only use the GlobalEventExecutor as last resort to notify.

View File

@ -25,6 +25,7 @@ import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
@ -32,6 +33,8 @@ import io.netty.util.concurrent.Future;
import org.junit.Assert;
import org.junit.Test;
import java.net.SocketAddress;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
@ -152,9 +155,11 @@ public class BootstrapTest {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
queue.add(future.channel().eventLoop().inEventLoop(Thread.currentThread()));
queue.add(future.isSuccess());
}
});
Assert.assertTrue(queue.take());
Assert.assertTrue(queue.take());
} finally {
group.shutdownGracefully();
group.terminationFuture().sync();
@ -162,24 +167,41 @@ public class BootstrapTest {
}
@Test
public void testLateRegisterFailed() throws Exception {
final TestEventLoopGroup group = new TestEventLoopGroup();
public void testLateRegisterSuccessBindFailed() throws Exception {
TestEventLoopGroup group = new TestEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group);
bootstrap.channel(LocalServerChannel.class);
bootstrap.channelFactory(new ChannelFactory<ServerChannel>() {
@Override
public ServerChannel newChannel() {
return new LocalServerChannel() {
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return newFailedFuture(new SocketException());
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return promise.setFailure(new SocketException());
}
};
}
});
bootstrap.childHandler(new DummyHandler());
bootstrap.localAddress(new LocalAddress("1"));
ChannelFuture future = bootstrap.bind();
Assert.assertFalse(future.isDone());
group.promise.setFailure(new IllegalStateException());
group.promise.setSuccess();
final BlockingQueue<Boolean> queue = new LinkedBlockingQueue<Boolean>();
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
queue.add(group.next().inEventLoop(Thread.currentThread()));
queue.add(future.channel().eventLoop().inEventLoop(Thread.currentThread()));
queue.add(future.isSuccess());
}
});
Assert.assertTrue(queue.take());
Assert.assertFalse(queue.take());
} finally {
group.shutdownGracefully();
@ -201,7 +223,7 @@ public class BootstrapTest {
}
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
public ChannelFuture register(Channel channel, final ChannelPromise promise) {
throw new UnsupportedOperationException();
}
}