[#937] Allow to bind in an async fashion via the new method ServerBootstrap.bindAsync(..)

This commit is contained in:
Norman Maurer 2013-01-15 15:50:38 +01:00
parent fc5606e034
commit fe09ad36e2

View File

@ -20,6 +20,7 @@ import org.jboss.netty.channel.ChannelConfig;
import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandler; import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
@ -27,6 +28,7 @@ import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ChildChannelStateEvent; import org.jboss.netty.channel.ChildChannelStateEvent;
import org.jboss.netty.channel.DefaultChannelFuture;
import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.ServerChannelFactory; import org.jboss.netty.channel.ServerChannelFactory;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
@ -36,9 +38,6 @@ import java.net.SocketAddress;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.jboss.netty.channel.Channels.*; import static org.jboss.netty.channel.Channels.*;
@ -232,6 +231,8 @@ public class ServerBootstrap extends Bootstrap {
* b.bind(b.getOption("localAddress")); * b.bind(b.getOption("localAddress"));
* </pre> * </pre>
* *
* This operation will block until the channel is bound.
*
* @return a new bound channel which accepts incoming connections * @return a new bound channel which accepts incoming connections
* *
* @throws IllegalStateException * @throws IllegalStateException
@ -252,7 +253,8 @@ public class ServerBootstrap extends Bootstrap {
} }
/** /**
* Creates a new channel which is bound to the specified local address. * Creates a new channel which is bound to the specified local address. This operation will block until
* the channel is bound.
* *
* @return a new bound channel which accepts incoming connections * @return a new bound channel which accepts incoming connections
* *
@ -261,14 +263,30 @@ public class ServerBootstrap extends Bootstrap {
* bind it to the local address * bind it to the local address
*/ */
public Channel bind(final SocketAddress localAddress) { public Channel bind(final SocketAddress localAddress) {
ChannelFuture future = bindAsync(localAddress);
// Wait for the future.
future.awaitUninterruptibly();
if (!future.isSuccess()) {
future.getChannel().close().awaitUninterruptibly();
throw new ChannelException("Failed to bind to: " + localAddress, future.getCause());
}
return future.getChannel();
}
/**
* Creates a new channel which is bound to the specified local address.
*
* @return a new {@link ChannelFuture} which will be notified once the Channel is
* bound and accepts incoming connections
*
*/
public ChannelFuture bindAsync(final SocketAddress localAddress) {
if (localAddress == null) { if (localAddress == null) {
throw new NullPointerException("localAddress"); throw new NullPointerException("localAddress");
} }
Binder binder = new Binder(localAddress);
final BlockingQueue<ChannelFuture> futureQueue =
new LinkedBlockingQueue<ChannelFuture>();
ChannelHandler binder = new Binder(localAddress, futureQueue);
ChannelHandler parentHandler = getParentHandler(); ChannelHandler parentHandler = getParentHandler();
ChannelPipeline bossPipeline = pipeline(); ChannelPipeline bossPipeline = pipeline();
@ -278,42 +296,29 @@ public class ServerBootstrap extends Bootstrap {
} }
Channel channel = getFactory().newChannel(bossPipeline); Channel channel = getFactory().newChannel(bossPipeline);
final ChannelFuture bfuture = new DefaultChannelFuture(channel, false);
// Wait until the future is available. binder.bindFuture.addListener(new ChannelFutureListener() {
ChannelFuture future = null; public void operationComplete(ChannelFuture future) throws Exception {
boolean interrupted = false; if (future.isSuccess()) {
do { bfuture.setSuccess();
try { } else {
future = futureQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS); // Call close on bind failure
} catch (InterruptedException e) { bfuture.getChannel().close();
interrupted = true; bfuture.setFailure(future.getCause());
}
} }
} while (future == null); });
return bfuture;
if (interrupted) {
Thread.currentThread().interrupt();
}
// Wait for the future.
future.awaitUninterruptibly();
if (!future.isSuccess()) {
future.getChannel().close().awaitUninterruptibly();
throw new ChannelException("Failed to bind to: " + localAddress, future.getCause());
}
return channel;
} }
private final class Binder extends SimpleChannelUpstreamHandler { private final class Binder extends SimpleChannelUpstreamHandler {
private final SocketAddress localAddress; private final SocketAddress localAddress;
private final BlockingQueue<ChannelFuture> futureQueue;
private final Map<String, Object> childOptions = private final Map<String, Object> childOptions =
new HashMap<String, Object>(); new HashMap<String, Object>();
private final DefaultChannelFuture bindFuture = new DefaultChannelFuture(null, false);
Binder(SocketAddress localAddress, BlockingQueue<ChannelFuture> futureQueue) { Binder(SocketAddress localAddress) {
this.localAddress = localAddress; this.localAddress = localAddress;
this.futureQueue = futureQueue;
} }
@Override @Override
@ -343,8 +348,15 @@ public class ServerBootstrap extends Bootstrap {
ctx.sendUpstream(evt); ctx.sendUpstream(evt);
} }
boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress)); evt.getChannel().bind(localAddress).addListener(new ChannelFutureListener() {
assert finished; public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
bindFuture.setSuccess();
} else {
bindFuture.setFailure(future.getCause());
}
}
});
} }
@Override @Override
@ -364,8 +376,7 @@ public class ServerBootstrap extends Bootstrap {
public void exceptionCaught( public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception { throws Exception {
boolean finished = futureQueue.offer(failedFuture(e.getChannel(), e.getCause())); bindFuture.setFailure(e.getCause());
assert finished;
ctx.sendUpstream(e); ctx.sendUpstream(e);
} }
} }