[#2586] Use correct EventLoop to notify delayed successful registration

Motivation:

At the moment AbstractBoostrap.bind(...) will always use the GlobalEventExecutor to notify the returned ChannelFuture if the registration is not done yet. This should only be done if the registration fails later. If it completes successful we should just notify with the EventLoop of the Channel.

Modification:

Use EventLoop of the Channel if possible to use the correct Thread to notify and so guaranteer the right order of events.

Result:

Use the correct EventLoop for notification
This commit is contained in:
Norman Maurer 2014-06-20 16:51:28 +02:00
parent 217a666b1f
commit f341562b91
2 changed files with 72 additions and 1 deletions

View File

@ -27,6 +27,7 @@ import io.netty.channel.DefaultChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.internal.StringUtil;
@ -281,7 +282,7 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
doBind0(regFuture, channel, localAddress, promise);
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE);
promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
@ -451,4 +452,21 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
return StringUtil.simpleClassName(clazz) + ".class";
}
}
private static final class PendingRegistrationPromise extends DefaultChannelPromise {
private PendingRegistrationPromise(Channel channel) {
super(channel);
}
@Override
protected EventExecutor executor() {
if (isSuccess()) {
// If the registration was a success we can just call super.executor() which will return
// channel.eventLoop().
return super.executor();
}
// The registration failed so we can only use the GlobalEventExecutor as last resort to notify.
return GlobalEventExecutor.INSTANCE;
}
}
}

View File

@ -17,17 +17,25 @@
package io.netty.bootstrap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
import io.netty.util.concurrent.Future;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class BootstrapTest {
@ -127,6 +135,51 @@ public class BootstrapTest {
}
}
@Test
public void testLateRegisterSuccess() throws Exception {
TestEventLoopGroup group = new TestEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group);
bootstrap.channel(LocalServerChannel.class);
bootstrap.childHandler(new DummyHandler());
bootstrap.localAddress(new LocalAddress("1"));
ChannelFuture future = bootstrap.bind();
Assert.assertFalse(future.isDone());
group.promise.setSuccess();
final BlockingQueue<Boolean> queue = new LinkedBlockingQueue<Boolean>();
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
queue.add(future.channel().eventLoop().inEventLoop(Thread.currentThread()));
}
});
Assert.assertTrue(queue.take());
} finally {
group.shutdownGracefully();
group.terminationFuture().sync();
}
}
private static final class TestEventLoopGroup extends DefaultEventLoopGroup {
ChannelPromise promise;
TestEventLoopGroup() {
super(1);
}
@Override
public ChannelFuture register(Channel channel) {
super.register(channel).syncUninterruptibly();
promise = channel.newPromise();
return promise;
}
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
throw new UnsupportedOperationException();
}
}
@Sharable
private static final class DummyHandler extends ChannelHandlerAdapter { }
}