Added ConnectionlessBootstrap.connect()

This commit is contained in:
Trustin Lee 2009-04-28 05:44:40 +00:00
parent f69cb45ff9
commit ef5fc808a2
2 changed files with 124 additions and 7 deletions

View File

@ -229,7 +229,9 @@ public class ClientBootstrap extends Bootstrap {
throw new ChannelPipelineException("Failed to initialize a pipeline.", e);
}
pipeline.addFirst("connector", new Connector(remoteAddress, localAddress, futureQueue));
pipeline.addFirst(
"connector", new Connector(
this, remoteAddress, localAddress, futureQueue));
getFactory().newChannel(pipeline);
@ -249,15 +251,19 @@ public class ClientBootstrap extends Bootstrap {
}
@ChannelPipelineCoverage("one")
private final class Connector extends SimpleChannelUpstreamHandler {
static final class Connector extends SimpleChannelUpstreamHandler {
private final Bootstrap bootstrap;
private final SocketAddress localAddress;
private final BlockingQueue<ChannelFuture> futureQueue;
private final SocketAddress remoteAddress;
private volatile boolean finished = false;
Connector(SocketAddress remoteAddress,
Connector(
Bootstrap bootstrap,
SocketAddress remoteAddress,
SocketAddress localAddress,
BlockingQueue<ChannelFuture> futureQueue) {
this.bootstrap = bootstrap;
this.localAddress = localAddress;
this.futureQueue = futureQueue;
this.remoteAddress = remoteAddress;
@ -270,7 +276,7 @@ public class ClientBootstrap extends Bootstrap {
context.sendUpstream(event);
// Apply options.
event.getChannel().getConfig().setOptions(getOptions());
event.getChannel().getConfig().setOptions(bootstrap.getOptions());
// Bind or connect.
if (localAddress != null) {

View File

@ -200,7 +200,7 @@ public class ConnectionlessBootstrap extends Bootstrap {
throw new ChannelPipelineException("Failed to initialize a pipeline.", e);
}
pipeline.addFirst("binder", new Binder(localAddress, futureQueue));
pipeline.addFirst("binder", new ConnectionlessBinder(localAddress, futureQueue));
Channel channel = getFactory().newChannel(pipeline);
@ -227,12 +227,12 @@ public class ConnectionlessBootstrap extends Bootstrap {
}
@ChannelPipelineCoverage("one")
private final class Binder extends SimpleChannelUpstreamHandler {
private final class ConnectionlessBinder extends SimpleChannelUpstreamHandler {
private final SocketAddress localAddress;
private final BlockingQueue<ChannelFuture> futureQueue;
Binder(SocketAddress localAddress, BlockingQueue<ChannelFuture> futureQueue) {
ConnectionlessBinder(SocketAddress localAddress, BlockingQueue<ChannelFuture> futureQueue) {
this.localAddress = localAddress;
this.futureQueue = futureQueue;
}
@ -260,4 +260,115 @@ public class ConnectionlessBootstrap extends Bootstrap {
ctx.sendUpstream(e);
}
}
/**
* Creates a new connected channel with the current {@code "remoteAddress"}
* and {@code "localAddress"} option. If the {@code "localAddress"} option
* is not set, the local address of a new channel is determined
* automatically. This method is similar to the following code:
*
* <pre>
* ConnectionlessBootstrap b = ...;
* b.connect(b.getOption("remoteAddress"), b.getOption("localAddress"));
* </pre>
*
* @return a future object which notifies when the creation of the connected
* channel succeeds or fails
*
* @throws IllegalStateException
* if {@code "remoteAddress"} option was not set
* @throws ClassCastException
* if {@code "remoteAddress"} or {@code "localAddress"} option's
* value is neither a {@link SocketAddress} nor {@code null}
* @throws ChannelPipelineException
* if this bootstrap's {@link #setPipelineFactory(ChannelPipelineFactory) pipelineFactory}
* failed to create a new {@link ChannelPipeline}
*/
public ChannelFuture connect() {
SocketAddress remoteAddress = (SocketAddress) getOption("remoteAddress");
if (remoteAddress == null) {
throw new IllegalStateException("remoteAddress option is not set.");
}
return connect(remoteAddress);
}
/**
* Creates a new connected channel with the specified
* {@code "remoteAddress"} and the current {@code "localAddress"} option.
* If the {@code "localAddress"} option is not set, the local address of
* a new channel is determined automatically. This method is identical
* with the following code:
*
* <pre>
* ClientBootstrap b = ...;
* b.connect(remoteAddress, b.getOption("localAddress"));
* </pre>
*
* @return a future object which notifies when the creation of the connected
* channel succeeds or fails
*
* @throws ClassCastException
* if {@code "localAddress"} option's value is
* neither a {@link SocketAddress} nor {@code null}
* @throws ChannelPipelineException
* if this bootstrap's {@link #setPipelineFactory(ChannelPipelineFactory) pipelineFactory}
* failed to create a new {@link ChannelPipeline}
*/
public ChannelFuture connect(SocketAddress remoteAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remotedAddress");
}
SocketAddress localAddress = (SocketAddress) getOption("localAddress");
return connect(remoteAddress, localAddress);
}
/**
* Creates a new connected channel with the specified
* {@code "remoteAddress"} and the specified {@code "localAddress"}.
* If the specified local address is {@code null}, the local address of a
* new channel is determined automatically.
*
* @return a future object which notifies when the creation of the connected
* channel succeeds or fails
*
* @throws ChannelPipelineException
* if this bootstrap's {@link #setPipelineFactory(ChannelPipelineFactory) pipelineFactory}
* failed to create a new {@link ChannelPipeline}
*/
public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
final BlockingQueue<ChannelFuture> futureQueue =
new LinkedBlockingQueue<ChannelFuture>();
ChannelPipeline pipeline;
try {
pipeline = getPipelineFactory().getPipeline();
} catch (Exception e) {
throw new ChannelPipelineException("Failed to initialize a pipeline.", e);
}
pipeline.addFirst(
"connector", new ClientBootstrap.Connector(
this, remoteAddress, localAddress, futureQueue));
getFactory().newChannel(pipeline);
// Wait until the future is available.
ChannelFuture future = null;
do {
try {
future = futureQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// Ignore
}
} while (future == null);
pipeline.remove("connector");
return future;
}
}