* Added ChannelFutureListener.CLOSE_ON_FAILURE

* Reduced code duplication
* Fixed warnings by FindBugs
This commit is contained in:
Trustin Lee 2009-03-04 14:30:47 +00:00
parent 07c720ad2c
commit 666c943b66
5 changed files with 42 additions and 39 deletions

View File

@ -46,6 +46,18 @@ public interface ChannelFutureListener extends EventListener {
} }
}; };
/**
* A {@link ChannelFutureListener} that closes the {@link Channel} when the
* operation ended up with a failure or cancellation rather than a success.
*/
static ChannelFutureListener CLOSE_ON_FAILURE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
future.getChannel().close();
}
}
};
/** /**
* Invoked when the I/O operation associated with the {@link ChannelFuture} * Invoked when the I/O operation associated with the {@link ChannelFuture}
* has been completed. * has been completed.

View File

@ -44,7 +44,7 @@ import org.jboss.netty.util.ThreadRenamingRunnable;
*/ */
final class HttpTunnelingClientSocketPipelineSink extends AbstractChannelSink { final class HttpTunnelingClientSocketPipelineSink extends AbstractChannelSink {
static String LINE_TERMINATOR = "\r\n"; static final String LINE_TERMINATOR = "\r\n";
private final Executor workerExecutor; private final Executor workerExecutor;
HttpTunnelingClientSocketPipelineSink(Executor workerExecutor) { HttpTunnelingClientSocketPipelineSink(Executor workerExecutor) {
@ -111,13 +111,7 @@ final class HttpTunnelingClientSocketPipelineSink extends AbstractChannelSink {
boolean connected = false; boolean connected = false;
boolean workerStarted = false; boolean workerStarted = false;
future.addListener(new ChannelFutureListener() { future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
public void operationComplete(ChannelFuture future) {
if (future.isCancelled()) {
future.getChannel().close();
}
}
});
try { try {
channel.connectAndSendHeaders(false, remoteAddress); channel.connectAndSendHeaders(false, remoteAddress);

View File

@ -144,13 +144,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
if (channel.socket.connect(remoteAddress)) { if (channel.socket.connect(remoteAddress)) {
channel.worker.register(channel, future); channel.worker.register(channel, future);
} else { } else {
future.addListener(new ChannelFutureListener() { future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
public void operationComplete(ChannelFuture future) {
if (future.isCancelled()) {
channel.close();
}
}
});
channel.connectFuture = future; channel.connectFuture = future;
boss.register(channel); boss.register(channel);
} }

View File

@ -114,13 +114,7 @@ class OioClientSocketPipelineSink extends AbstractChannelSink {
boolean connected = false; boolean connected = false;
boolean workerStarted = false; boolean workerStarted = false;
future.addListener(new ChannelFutureListener() { future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
public void operationComplete(ChannelFuture future) {
if (future.isCancelled()) {
future.getChannel().close();
}
}
});
try { try {
channel.socket.connect( channel.socket.connect(

View File

@ -89,23 +89,7 @@ final class XnioClientChannelSink extends AbstractChannelSink {
if (xnioChannel == null) { if (xnioChannel == null) {
FutureConnection fc = FutureConnection fc =
cc.xnioConnector.connectTo(value, HANDLER); cc.xnioConnector.connectTo(value, HANDLER);
fc.addNotifier(new Notifier() { fc.addNotifier(new FutureConnectionNotifier(cc), future);
public void notify(
IoFuture future, Object attachment) {
ChannelFuture cf = (ChannelFuture) attachment;
try {
java.nio.channels.Channel xnioChannel = (java.nio.channels.Channel) future.get();
cc.xnioChannel = xnioChannel;
XnioChannelRegistry.registerChannelMapping(cc);
cf.setSuccess();
} catch (Throwable t) {
cf.setFailure(t);
fireExceptionCaught(cc, t);
} finally {
cc.connecting = false;
}
}
}, future);
} else { } else {
Exception cause = new AlreadyConnectedException(); Exception cause = new AlreadyConnectedException();
future.setFailure(cause); future.setFailure(cause);
@ -151,4 +135,29 @@ final class XnioClientChannelSink extends AbstractChannelSink {
} }
} }
} }
@SuppressWarnings("unchecked")
private static final class FutureConnectionNotifier implements Notifier {
private final XnioClientChannel cc;
FutureConnectionNotifier(XnioClientChannel cc) {
this.cc = cc;
}
public void notify(IoFuture future, Object attachment) {
ChannelFuture cf = (ChannelFuture) attachment;
try {
java.nio.channels.Channel xnioChannel = (java.nio.channels.Channel) future.get();
cc.xnioChannel = xnioChannel;
XnioChannelRegistry.registerChannelMapping(cc);
cf.setSuccess();
} catch (Throwable t) {
cf.setFailure(t);
fireExceptionCaught(cc, t);
} finally {
cc.connecting = false;
}
}
}
} }