[#2586] Use correct EventLoop to notify delayed bind failures
Motivation: When a bind fails AbstractBootstrap will use the GlobalEventExecutor to notify the ChannelPromise. We should use the EventLoop of the Channel if possible. Modification: Use EventLoop of the Channel if possible to use the correct Thread to notify and so guaranteer the right order of events. Result: Use the correct EventLoop for notification
This commit is contained in:
parent
1464abe765
commit
cd688c5cde
@ -300,7 +300,8 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
|
|||||||
init(channel);
|
init(channel);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
channel.unsafe().closeForcibly();
|
channel.unsafe().closeForcibly();
|
||||||
return channel.newFailedFuture(t);
|
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
|
||||||
|
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
|
||||||
}
|
}
|
||||||
|
|
||||||
ChannelFuture regFuture = group().register(channel);
|
ChannelFuture regFuture = group().register(channel);
|
||||||
@ -460,9 +461,11 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected EventExecutor executor() {
|
protected EventExecutor executor() {
|
||||||
if (isSuccess()) {
|
if (channel().isRegistered()) {
|
||||||
// If the registration was a success we can just call super.executor() which will return
|
// If the registration was a success we can just call super.executor() which will return
|
||||||
// channel.eventLoop().
|
// channel.eventLoop().
|
||||||
|
//
|
||||||
|
// See https://github.com/netty/netty/issues/2586
|
||||||
return super.executor();
|
return super.executor();
|
||||||
}
|
}
|
||||||
// The registration failed so we can only use the GlobalEventExecutor as last resort to notify.
|
// The registration failed so we can only use the GlobalEventExecutor as last resort to notify.
|
||||||
|
@ -24,20 +24,20 @@ import io.netty.channel.ChannelInboundHandler;
|
|||||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||||
import io.netty.channel.ChannelPromise;
|
import io.netty.channel.ChannelPromise;
|
||||||
import io.netty.channel.EventLoopGroup;
|
import io.netty.channel.EventLoopGroup;
|
||||||
|
import io.netty.channel.ServerChannel;
|
||||||
import io.netty.channel.local.LocalAddress;
|
import io.netty.channel.local.LocalAddress;
|
||||||
import io.netty.channel.local.LocalChannel;
|
import io.netty.channel.local.LocalChannel;
|
||||||
import io.netty.channel.local.LocalEventLoopGroup;
|
import io.netty.channel.local.LocalEventLoopGroup;
|
||||||
import io.netty.channel.local.LocalServerChannel;
|
import io.netty.channel.local.LocalServerChannel;
|
||||||
import io.netty.util.concurrent.Future;
|
import io.netty.util.concurrent.Future;
|
||||||
import io.netty.util.concurrent.GlobalEventExecutor;
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.net.SocketAddress;
|
||||||
|
import java.net.SocketException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.LinkedBlockingDeque;
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
public class BootstrapTest {
|
public class BootstrapTest {
|
||||||
@ -155,9 +155,11 @@ public class BootstrapTest {
|
|||||||
@Override
|
@Override
|
||||||
public void operationComplete(ChannelFuture future) throws Exception {
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
queue.add(future.channel().eventLoop().inEventLoop(Thread.currentThread()));
|
queue.add(future.channel().eventLoop().inEventLoop(Thread.currentThread()));
|
||||||
|
queue.add(future.isSuccess());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
Assert.assertTrue(queue.take());
|
Assert.assertTrue(queue.take());
|
||||||
|
Assert.assertTrue(queue.take());
|
||||||
} finally {
|
} finally {
|
||||||
group.shutdownGracefully();
|
group.shutdownGracefully();
|
||||||
group.terminationFuture().sync();
|
group.terminationFuture().sync();
|
||||||
@ -165,24 +167,41 @@ public class BootstrapTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testLateRegisterFailed() throws Exception {
|
public void testLateRegisterSuccessBindFailed() throws Exception {
|
||||||
final TestLocalEventLoopGroup group = new TestLocalEventLoopGroup();
|
TestLocalEventLoopGroup group = new TestLocalEventLoopGroup();
|
||||||
try {
|
try {
|
||||||
ServerBootstrap bootstrap = new ServerBootstrap();
|
ServerBootstrap bootstrap = new ServerBootstrap();
|
||||||
bootstrap.group(group);
|
bootstrap.group(group);
|
||||||
bootstrap.channel(LocalServerChannel.class);
|
bootstrap.channelFactory(new ChannelFactory<ServerChannel>() {
|
||||||
|
@Override
|
||||||
|
public ServerChannel newChannel() {
|
||||||
|
return new LocalServerChannel() {
|
||||||
|
@Override
|
||||||
|
public ChannelFuture bind(SocketAddress localAddress) {
|
||||||
|
return newFailedFuture(new SocketException());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
|
||||||
|
return promise.setFailure(new SocketException());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
});
|
||||||
bootstrap.childHandler(new DummyHandler());
|
bootstrap.childHandler(new DummyHandler());
|
||||||
bootstrap.localAddress(new LocalAddress("1"));
|
bootstrap.localAddress(new LocalAddress("1"));
|
||||||
ChannelFuture future = bootstrap.bind();
|
ChannelFuture future = bootstrap.bind();
|
||||||
Assert.assertFalse(future.isDone());
|
Assert.assertFalse(future.isDone());
|
||||||
group.promise.setFailure(new IllegalStateException());
|
group.promise.setSuccess();
|
||||||
final BlockingQueue<Boolean> queue = new LinkedBlockingQueue<Boolean>();
|
final BlockingQueue<Boolean> queue = new LinkedBlockingQueue<Boolean>();
|
||||||
future.addListener(new ChannelFutureListener() {
|
future.addListener(new ChannelFutureListener() {
|
||||||
@Override
|
@Override
|
||||||
public void operationComplete(ChannelFuture future) throws Exception {
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
queue.add(group.next().inEventLoop(Thread.currentThread()));
|
queue.add(future.channel().eventLoop().inEventLoop(Thread.currentThread()));
|
||||||
|
queue.add(future.isSuccess());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
Assert.assertTrue(queue.take());
|
||||||
Assert.assertFalse(queue.take());
|
Assert.assertFalse(queue.take());
|
||||||
} finally {
|
} finally {
|
||||||
group.shutdownGracefully();
|
group.shutdownGracefully();
|
||||||
@ -204,7 +223,7 @@ public class BootstrapTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture register(Channel channel, ChannelPromise promise) {
|
public ChannelFuture register(Channel channel, final ChannelPromise promise) {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user