Fix #153: Add ChannelFuture.rethrowIfFailed()

This commit is contained in:
Trustin Lee 2012-01-19 13:33:37 +09:00
parent c1aa8b4c7b
commit 40ef4d2ccf
6 changed files with 114 additions and 57 deletions

View File

@ -58,66 +58,72 @@ public class WebSocketClient {
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
String protocol = uri.getScheme();
if (!protocol.equals("ws")) {
throw new IllegalArgumentException("Unsupported protocol: " + protocol);
}
HashMap<String, String> customHeaders = new HashMap<String, String>();
customHeaders.put("MyHeader", "MyValue");
// Connect with V13 (RFC 6455). You can change it to V08 or V00.
// If you change it to V00, ping is not supported and remember to change
// HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline.
final WebSocketClientHandshaker handshaker =
new WebSocketClientHandshakerFactory().newHandshaker(
uri, WebSocketVersion.V13, null, false, customHeaders);
Channel ch = null;
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
// If you wish to support HyBi V00, you need to use
// WebSocketHttpResponseDecoder instead for
// HttpResponseDecoder.
pipeline.addLast("decoder", new HttpResponseDecoder());
pipeline.addLast("encoder", new HttpRequestEncoder());
pipeline.addLast("ws-handler", new WebSocketClientHandler(handshaker));
return pipeline;
try {
String protocol = uri.getScheme();
if (!protocol.equals("ws")) {
throw new IllegalArgumentException("Unsupported protocol: " + protocol);
}
});
// Connect
System.out.println("WebSocket Client connecting");
ChannelFuture future =
bootstrap.connect(
new InetSocketAddress(uri.getHost(), uri.getPort()));
future.awaitUninterruptibly();
Channel ch = future.getChannel();
handshaker.handshake(ch).awaitUninterruptibly();
// Send 10 messages and wait for responses
System.out.println("WebSocket Client sending message");
for (int i = 0; i < 10; i++) {
ch.write(new TextWebSocketFrame("Message #" + i));
HashMap<String, String> customHeaders = new HashMap<String, String>();
customHeaders.put("MyHeader", "MyValue");
// Connect with V13 (RFC 6455). You can change it to V08 or V00.
// If you change it to V00, ping is not supported and remember to change
// HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline.
final WebSocketClientHandshaker handshaker =
new WebSocketClientHandshakerFactory().newHandshaker(
uri, WebSocketVersion.V13, null, false, customHeaders);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
// If you wish to support HyBi V00, you need to use
// WebSocketHttpResponseDecoder instead for
// HttpResponseDecoder.
pipeline.addLast("decoder", new HttpResponseDecoder());
pipeline.addLast("encoder", new HttpRequestEncoder());
pipeline.addLast("ws-handler", new WebSocketClientHandler(handshaker));
return pipeline;
}
});
// Connect
System.out.println("WebSocket Client connecting");
ChannelFuture future =
bootstrap.connect(
new InetSocketAddress(uri.getHost(), uri.getPort()));
future.awaitUninterruptibly().rethrowIfFailed();
ch = future.getChannel();
handshaker.handshake(ch).awaitUninterruptibly().rethrowIfFailed();
// Send 10 messages and wait for responses
System.out.println("WebSocket Client sending message");
for (int i = 0; i < 10; i++) {
ch.write(new TextWebSocketFrame("Message #" + i));
}
// Ping
System.out.println("WebSocket Client sending ping");
ch.write(new PingWebSocketFrame(ChannelBuffers.copiedBuffer(new byte[]{1, 2, 3, 4, 5, 6})));
// Close
System.out.println("WebSocket Client sending close");
ch.write(new CloseWebSocketFrame());
// WebSocketClientHandler will close the connection when the server
// responds to the CloseWebSocketFrame.
ch.getCloseFuture().awaitUninterruptibly();
} finally {
if (ch != null) {
ch.close();
}
bootstrap.releaseExternalResources();
}
// Ping
System.out.println("WebSocket Client sending ping");
ch.write(new PingWebSocketFrame(ChannelBuffers.copiedBuffer(new byte[]{1, 2, 3, 4, 5, 6})));
// Close
System.out.println("WebSocket Client sending close");
ch.write(new CloseWebSocketFrame());
// WebSocketClientHandler will close the connection when the server
// responds to the CloseWebSocketFrame.
ch.getCloseFuture().awaitUninterruptibly();
bootstrap.releaseExternalResources();
}
public static void main(String[] args) throws Exception {

View File

@ -258,6 +258,12 @@ public interface ChannelFuture {
*/
void removeListener(ChannelFutureListener listener);
/**
* Rethrows the exception that caused this future fail if this future is
* complete and failed.
*/
ChannelFuture rethrowIfFailed() throws Exception;
/**
* Waits for this future to be completed.
*

View File

@ -175,6 +175,28 @@ public class DefaultChannelFuture implements ChannelFuture {
}
}
@Override
public ChannelFuture rethrowIfFailed() throws Exception {
if (!isDone()) {
return this;
}
Throwable cause = getCause();
if (cause == null) {
return this;
}
if (cause instanceof Exception) {
throw (Exception) cause;
}
if (cause instanceof Error) {
throw (Error) cause;
}
throw new RuntimeException(cause);
}
@Override
public ChannelFuture await() throws InterruptedException {
if (Thread.interrupted()) {

View File

@ -47,4 +47,17 @@ public class FailedChannelFuture extends CompleteChannelFuture {
public boolean isSuccess() {
return false;
}
@Override
public ChannelFuture rethrowIfFailed() throws Exception {
if (cause instanceof Exception) {
throw (Exception) cause;
}
if (cause instanceof Error) {
throw (Error) cause;
}
throw new RuntimeException(cause);
}
}

View File

@ -40,4 +40,9 @@ public class SucceededChannelFuture extends CompleteChannelFuture {
public boolean isSuccess() {
return true;
}
@Override
public ChannelFuture rethrowIfFailed() throws Exception {
return this;
}
}

View File

@ -99,6 +99,11 @@ public class CompleteChannelFutureTest {
public boolean isSuccess() {
throw new Error();
}
@Override
public ChannelFuture rethrowIfFailed() throws Exception {
throw new Error();
}
}
private static class ExpectedError extends Error {