[#661] WebSocketClientHandshaker is broken.

- Remove HttpRequestEncoder after handshaking is complete
- Fix a bug in the WebSocket client example where it sends a frame even before handshake is complete
This commit is contained in:
Trustin Lee 2012-10-16 14:40:39 -07:00
parent b76c39318d
commit a6c4f651a7
5 changed files with 47 additions and 13 deletions

View File

@ -174,7 +174,9 @@ public class WebSocketClientHandshaker00 extends WebSocketClientHandshaker {
@Override @Override
public void operationComplete(ChannelFuture future) { public void operationComplete(ChannelFuture future) {
ChannelPipeline p = future.channel().pipeline(); ChannelPipeline p = future.channel().pipeline();
p.replace(HttpRequestEncoder.class, "ws-encoder", new WebSocket00FrameEncoder()); p.addAfter(
p.context(HttpRequestEncoder.class).name(),
"ws-encoder", new WebSocket00FrameEncoder());
} }
}); });
@ -233,7 +235,9 @@ public class WebSocketClientHandshaker00 extends WebSocketClientHandshaker {
setHandshakeComplete(); setHandshakeComplete();
channel.pipeline().get(HttpResponseDecoder.class).replace( ChannelPipeline p = channel.pipeline();
p.remove(HttpRequestEncoder.class);
p.get(HttpResponseDecoder.class).replace(
"ws-decoder", new WebSocket00FrameDecoder(getMaxFramePayloadLength())); "ws-decoder", new WebSocket00FrameDecoder(getMaxFramePayloadLength()));
} }

View File

@ -153,7 +153,9 @@ public class WebSocketClientHandshaker08 extends WebSocketClientHandshaker {
@Override @Override
public void operationComplete(ChannelFuture future) { public void operationComplete(ChannelFuture future) {
ChannelPipeline p = future.channel().pipeline(); ChannelPipeline p = future.channel().pipeline();
p.replace(HttpRequestEncoder.class, "ws-encoder", new WebSocket08FrameEncoder(true)); p.addAfter(
p.context(HttpRequestEncoder.class).name(),
"ws-encoder", new WebSocket08FrameEncoder(true));
} }
}); });
@ -210,7 +212,9 @@ public class WebSocketClientHandshaker08 extends WebSocketClientHandshaker {
setHandshakeComplete(); setHandshakeComplete();
channel.pipeline().get(HttpResponseDecoder.class).replace( ChannelPipeline p = channel.pipeline();
p.remove(HttpRequestEncoder.class);
p.get(HttpResponseDecoder.class).replace(
"ws-decoder", "ws-decoder",
new WebSocket08FrameDecoder(false, allowExtensions, getMaxFramePayloadLength())); new WebSocket08FrameDecoder(false, allowExtensions, getMaxFramePayloadLength()));
} }

View File

@ -153,7 +153,9 @@ public class WebSocketClientHandshaker13 extends WebSocketClientHandshaker {
@Override @Override
public void operationComplete(ChannelFuture future) { public void operationComplete(ChannelFuture future) {
ChannelPipeline p = future.channel().pipeline(); ChannelPipeline p = future.channel().pipeline();
p.replace(HttpRequestEncoder.class, "ws-encoder", new WebSocket13FrameEncoder(true)); p.addAfter(
p.context(HttpRequestEncoder.class).name(),
"ws-encoder", new WebSocket13FrameEncoder(true));
} }
}); });
@ -210,7 +212,9 @@ public class WebSocketClientHandshaker13 extends WebSocketClientHandshaker {
setHandshakeComplete(); setHandshakeComplete();
channel.pipeline().get(HttpResponseDecoder.class).replace( ChannelPipeline p = channel.pipeline();
p.remove(HttpRequestEncoder.class);
p.get(HttpResponseDecoder.class).replace(
"ws-decoder", "ws-decoder",
new WebSocket13FrameDecoder(false, allowExtensions, getMaxFramePayloadLength())); new WebSocket13FrameDecoder(false, allowExtensions, getMaxFramePayloadLength()));
} }

View File

@ -49,7 +49,6 @@ import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion; import io.netty.handler.codec.http.websocketx.WebSocketVersion;
@ -79,9 +78,10 @@ public class WebSocketClient {
// Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00. // Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00.
// If you change it to V00, ping is not supported and remember to change // If you change it to V00, ping is not supported and remember to change
// HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline. // HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline.
final WebSocketClientHandshaker handshaker = final WebSocketClientHandler handler =
new WebSocketClientHandshakerFactory().newHandshaker( new WebSocketClientHandler(
uri, WebSocketVersion.V13, null, false, customHeaders); new WebSocketClientHandshakerFactory().newHandshaker(
uri, WebSocketVersion.V13, null, false, customHeaders));
b.group(new NioEventLoopGroup()) b.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class) .channel(NioSocketChannel.class)
@ -92,17 +92,17 @@ public class WebSocketClient {
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new HttpResponseDecoder()); pipeline.addLast("decoder", new HttpResponseDecoder());
pipeline.addLast("encoder", new HttpRequestEncoder()); pipeline.addLast("encoder", new HttpRequestEncoder());
pipeline.addLast("ws-handler", new WebSocketClientHandler(handshaker)); pipeline.addLast("ws-handler", handler);
} }
}); });
System.out.println("WebSocket Client connecting"); System.out.println("WebSocket Client connecting");
Channel ch = b.connect().sync().channel(); Channel ch = b.connect().sync().channel();
handshaker.handshake(ch).sync(); handler.handshakeFuture().sync();
// Send 10 messages and wait for responses // Send 10 messages and wait for responses
System.out.println("WebSocket Client sending message"); System.out.println("WebSocket Client sending message");
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1; i++) {
ch.write(new TextWebSocketFrame("Message #" + i)); ch.write(new TextWebSocketFrame("Message #" + i));
} }

View File

@ -38,6 +38,7 @@
package io.netty.example.http.websocketx.client; package io.netty.example.http.websocketx.client;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter; import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponse;
@ -51,11 +52,26 @@ import io.netty.util.CharsetUtil;
public class WebSocketClientHandler extends ChannelInboundMessageHandlerAdapter<Object> { public class WebSocketClientHandler extends ChannelInboundMessageHandlerAdapter<Object> {
private final WebSocketClientHandshaker handshaker; private final WebSocketClientHandshaker handshaker;
private ChannelFuture handshakeFuture;
public WebSocketClientHandler(WebSocketClientHandshaker handshaker) { public WebSocketClientHandler(WebSocketClientHandshaker handshaker) {
this.handshaker = handshaker; this.handshaker = handshaker;
} }
public ChannelFuture handshakeFuture() {
return handshakeFuture;
}
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
handshakeFuture = ctx.newFuture();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
handshaker.handshake(ctx.channel());
}
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("WebSocket Client disconnected!"); System.out.println("WebSocket Client disconnected!");
@ -67,6 +83,7 @@ public class WebSocketClientHandler extends ChannelInboundMessageHandlerAdapter<
if (!handshaker.isHandshakeComplete()) { if (!handshaker.isHandshakeComplete()) {
handshaker.finishHandshake(ch, (HttpResponse) msg); handshaker.finishHandshake(ch, (HttpResponse) msg);
System.out.println("WebSocket Client connected!"); System.out.println("WebSocket Client connected!");
handshakeFuture.setSuccess();
return; return;
} }
@ -91,6 +108,11 @@ public class WebSocketClientHandler extends ChannelInboundMessageHandlerAdapter<
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace(); cause.printStackTrace();
if (!handshakeFuture.isDone()) {
handshakeFuture.setFailure(cause);
}
ctx.close(); ctx.close();
} }
} }