[#2586] Use correct EventLoop to notify delayed successful registration
Motivation: At the moment AbstractBoostrap.bind(...) will always use the GlobalEventExecutor to notify the returned ChannelFuture if the registration is not done yet. This should only be done if the registration fails later. If it completes successful we should just notify with the EventLoop of the Channel. Modification: Use EventLoop of the Channel if possible to use the correct Thread to notify and so guaranteer the right order of events. Result: Use the correct EventLoop for notification
This commit is contained in:
parent
fb538ea532
commit
1278467fec
@ -27,6 +27,7 @@ import io.netty.channel.DefaultChannelPromise;
|
|||||||
import io.netty.channel.EventLoop;
|
import io.netty.channel.EventLoop;
|
||||||
import io.netty.channel.EventLoopGroup;
|
import io.netty.channel.EventLoopGroup;
|
||||||
import io.netty.util.AttributeKey;
|
import io.netty.util.AttributeKey;
|
||||||
|
import io.netty.util.concurrent.EventExecutor;
|
||||||
import io.netty.util.concurrent.GlobalEventExecutor;
|
import io.netty.util.concurrent.GlobalEventExecutor;
|
||||||
import io.netty.util.internal.StringUtil;
|
import io.netty.util.internal.StringUtil;
|
||||||
|
|
||||||
@ -281,7 +282,7 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
|
|||||||
doBind0(regFuture, channel, localAddress, promise);
|
doBind0(regFuture, channel, localAddress, promise);
|
||||||
} else {
|
} else {
|
||||||
// Registration future is almost always fulfilled already, but just in case it's not.
|
// Registration future is almost always fulfilled already, but just in case it's not.
|
||||||
promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE);
|
promise = new PendingRegistrationPromise(channel);
|
||||||
regFuture.addListener(new ChannelFutureListener() {
|
regFuture.addListener(new ChannelFutureListener() {
|
||||||
@Override
|
@Override
|
||||||
public void operationComplete(ChannelFuture future) throws Exception {
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
@ -451,4 +452,21 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
|
|||||||
return StringUtil.simpleClassName(clazz) + ".class";
|
return StringUtil.simpleClassName(clazz) + ".class";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final class PendingRegistrationPromise extends DefaultChannelPromise {
|
||||||
|
private PendingRegistrationPromise(Channel channel) {
|
||||||
|
super(channel);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected EventExecutor executor() {
|
||||||
|
if (isSuccess()) {
|
||||||
|
// If the registration was a success we can just call super.executor() which will return
|
||||||
|
// channel.eventLoop().
|
||||||
|
return super.executor();
|
||||||
|
}
|
||||||
|
// The registration failed so we can only use the GlobalEventExecutor as last resort to notify.
|
||||||
|
return GlobalEventExecutor.INSTANCE;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -16,18 +16,26 @@
|
|||||||
|
|
||||||
package io.netty.bootstrap;
|
package io.netty.bootstrap;
|
||||||
|
|
||||||
|
import io.netty.channel.Channel;
|
||||||
|
import io.netty.channel.ChannelFuture;
|
||||||
|
import io.netty.channel.ChannelFutureListener;
|
||||||
import io.netty.channel.ChannelHandler.Sharable;
|
import io.netty.channel.ChannelHandler.Sharable;
|
||||||
import io.netty.channel.ChannelInboundHandler;
|
import io.netty.channel.ChannelInboundHandler;
|
||||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||||
|
import io.netty.channel.ChannelPromise;
|
||||||
import io.netty.channel.EventLoopGroup;
|
import io.netty.channel.EventLoopGroup;
|
||||||
import io.netty.channel.local.LocalAddress;
|
import io.netty.channel.local.LocalAddress;
|
||||||
import io.netty.channel.local.LocalChannel;
|
import io.netty.channel.local.LocalChannel;
|
||||||
import io.netty.channel.local.LocalEventLoopGroup;
|
import io.netty.channel.local.LocalEventLoopGroup;
|
||||||
|
import io.netty.channel.local.LocalServerChannel;
|
||||||
import io.netty.util.concurrent.Future;
|
import io.netty.util.concurrent.Future;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
public class BootstrapTest {
|
public class BootstrapTest {
|
||||||
|
|
||||||
@ -127,6 +135,51 @@ public class BootstrapTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLateRegisterSuccess() throws Exception {
|
||||||
|
TestLocalEventLoopGroup group = new TestLocalEventLoopGroup();
|
||||||
|
try {
|
||||||
|
ServerBootstrap bootstrap = new ServerBootstrap();
|
||||||
|
bootstrap.group(group);
|
||||||
|
bootstrap.channel(LocalServerChannel.class);
|
||||||
|
bootstrap.childHandler(new DummyHandler());
|
||||||
|
bootstrap.localAddress(new LocalAddress("1"));
|
||||||
|
ChannelFuture future = bootstrap.bind();
|
||||||
|
Assert.assertFalse(future.isDone());
|
||||||
|
group.promise.setSuccess();
|
||||||
|
final BlockingQueue<Boolean> queue = new LinkedBlockingQueue<Boolean>();
|
||||||
|
future.addListener(new ChannelFutureListener() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
|
queue.add(future.channel().eventLoop().inEventLoop(Thread.currentThread()));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Assert.assertTrue(queue.take());
|
||||||
|
} finally {
|
||||||
|
group.shutdownGracefully();
|
||||||
|
group.terminationFuture().sync();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class TestLocalEventLoopGroup extends LocalEventLoopGroup {
|
||||||
|
ChannelPromise promise;
|
||||||
|
TestLocalEventLoopGroup() {
|
||||||
|
super(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture register(Channel channel) {
|
||||||
|
super.register(channel).syncUninterruptibly();
|
||||||
|
promise = channel.newPromise();
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelFuture register(Channel channel, ChannelPromise promise) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Sharable
|
@Sharable
|
||||||
private static final class DummyHandler extends ChannelInboundHandlerAdapter { }
|
private static final class DummyHandler extends ChannelInboundHandlerAdapter { }
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user