Make WebSocket codec also work when HttpClientCodec and HttpServerCodec is used.

Also refactor the handshakers to share more code and make it easier to implement a new one and less error-prone
This commit is contained in:
Norman Maurer 2013-03-07 10:57:27 +01:00 committed by Trustin Lee
parent 41ab17b9bf
commit 6ac9b17ddd
13 changed files with 282 additions and 370 deletions

View File

@ -64,6 +64,14 @@ public final class HttpClientCodec
this(4096, 8192, 8192, false);
}
public void setSingleDecode(boolean singleDecode) {
decoder().setSingleDecode(singleDecode);
}
public boolean isSingleDecode() {
return decoder().isSingleDecode();
}
/**
* Creates a new instance with the specified decoder options.
*/

View File

@ -17,9 +17,18 @@ package io.netty.handler.codec.http.websocketx;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelOutboundMessageHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
import java.net.URI;
@ -94,7 +103,7 @@ public abstract class WebSocketClientHandshaker {
return handshakeComplete;
}
protected void setHandshakeComplete() {
private void setHandshakeComplete() {
handshakeComplete = true;
}
@ -113,7 +122,7 @@ public abstract class WebSocketClientHandshaker {
return actualSubprotocol;
}
protected void setActualSubprotocol(String actualSubprotocol) {
private void setActualSubprotocol(String actualSubprotocol) {
this.actualSubprotocol = actualSubprotocol;
}
@ -138,7 +147,49 @@ public abstract class WebSocketClientHandshaker {
* @param promise
* the {@link ChannelPromise} to be notified when the opening handshake is sent
*/
public abstract ChannelFuture handshake(Channel channel, ChannelPromise promise);
public final ChannelFuture handshake(Channel channel, final ChannelPromise promise) {
FullHttpRequest request = newHandshakeRequest();
HttpResponseDecoder decoder = channel.pipeline().get(HttpResponseDecoder.class);
if (decoder == null) {
HttpClientCodec codec = channel.pipeline().get(HttpClientCodec.class);
if (codec == null) {
promise.setFailure(new IllegalStateException("ChannelPipeline does not contain " +
"a HttpResponseDecoder or HttpClientCodec"));
return promise;
}
codec.setSingleDecode(true);
} else {
decoder.setSingleDecode(true);
}
channel.write(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
ChannelPipeline p = future.channel().pipeline();
ChannelHandlerContext ctx = p.context(HttpRequestEncoder.class);
if (ctx == null) {
ctx = p.context(HttpClientCodec.class);
}
if (ctx == null) {
promise.setFailure(new IllegalStateException("ChannelPipeline does not contain " +
"a HttpRequestEncoder or HttpClientCodec"));
return;
}
p.addAfter(ctx.name(), "ws-encoder", newWebSocketEncoder());
promise.setSuccess();
} else {
promise.setFailure(future.cause());
}
}
});
return promise;
}
/**
* Returns a new {@link FullHttpRequest) which will be used for the handshake.
*/
protected abstract FullHttpRequest newHandshakeRequest();
/**
* Validates and finishes the opening handshake initiated by {@link #handshake}}.
@ -148,5 +199,40 @@ public abstract class WebSocketClientHandshaker {
* @param response
* HTTP response containing the closing handshake details
*/
public abstract void finishHandshake(Channel channel, FullHttpResponse response);
public final void finishHandshake(Channel channel, FullHttpResponse response) {
verify(response);
setActualSubprotocol(response.headers().get(HttpHeaders.Names.SEC_WEBSOCKET_PROTOCOL));
setHandshakeComplete();
ChannelPipeline p = channel.pipeline();
ChannelHandlerContext ctx = p.context(HttpRequestEncoder.class);
if (ctx == null) {
ctx = p.context(HttpClientCodec.class);
if (ctx == null) {
throw new IllegalStateException("ChannelPipeline does not contain " +
"a HttpRequestEncoder or HttpClientCodec");
}
p.replaceAndForward(ctx.name(), "ws-decoder", newWebsocketDecoder());
} else {
p.remove(HttpRequestEncoder.class);
p.replaceAndForward(ctx.name(),
"ws-decoder", newWebsocketDecoder());
}
}
/**
* Verfiy the {@link FullHttpResponse} and throws a {@link WebSocketHandshakeException} if something is wrong.
*/
protected abstract void verify(FullHttpResponse response);
/**
* Returns the decoder to use after handshake is complete.
*/
protected abstract ChannelInboundByteHandler newWebsocketDecoder();
/**
* Returns the encoder to use after the handshake is complete.
*/
protected abstract ChannelOutboundMessageHandler<WebSocketFrame> newWebSocketEncoder();
}

View File

@ -17,11 +17,8 @@ package io.netty.handler.codec.http.websocketx;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelOutboundMessageHandler;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
@ -29,8 +26,6 @@ import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpHeaders.Names;
import io.netty.handler.codec.http.HttpHeaders.Values;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
@ -88,11 +83,9 @@ public class WebSocketClientHandshaker00 extends WebSocketClientHandshaker {
* ^n:ds[4U
* </pre>
*
* @param channel
* Channel into which we can write our request
*/
@Override
public ChannelFuture handshake(Channel channel, final ChannelPromise promise) {
protected FullHttpRequest newHandshakeRequest() {
// Make keys
int spaces1 = WebSocketUtil.randomNumber(1, 12);
int spaces2 = WebSocketUtil.randomNumber(1, 12);
@ -173,27 +166,7 @@ public class WebSocketClientHandshaker00 extends WebSocketClientHandshaker {
// See also: http://www.ietf.org/mail-archive/web/hybi/current/msg02149.html
headers.set(Names.CONTENT_LENGTH, key3.length);
request.data().writeBytes(key3);
channel.pipeline().get(HttpResponseDecoder.class).setSingleDecode(true);
ChannelFuture future = channel.write(request);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
ChannelPipeline p = future.channel().pipeline();
p.addAfter(
p.context(HttpRequestEncoder.class).name(),
"ws-encoder", new WebSocket00FrameEncoder());
if (future.isSuccess()) {
promise.setSuccess();
} else {
promise.setFailure(future.cause());
}
}
});
return promise;
return request;
}
/**
@ -212,14 +185,12 @@ public class WebSocketClientHandshaker00 extends WebSocketClientHandshaker {
* 8jKS'y:G*Co,Wxa-
* </pre>
*
* @param channel
* Channel
* @param response
* HTTP response returned from the server for the request sent by beginOpeningHandshake00().
* @throws WebSocketHandshakeException
*/
@Override
public void finishHandshake(Channel channel, FullHttpResponse response) {
protected void verify(FullHttpResponse response) {
final HttpResponseStatus status = new HttpResponseStatus(101, "WebSocket Protocol Handshake");
if (!response.getStatus().equals(status)) {
@ -244,16 +215,6 @@ public class WebSocketClientHandshaker00 extends WebSocketClientHandshaker {
if (!challenge.equals(expectedChallengeResponseBytes)) {
throw new WebSocketHandshakeException("Invalid challenge");
}
String subprotocol = headers.get(Names.SEC_WEBSOCKET_PROTOCOL);
setActualSubprotocol(subprotocol);
setHandshakeComplete();
ChannelPipeline p = channel.pipeline();
p.remove(HttpRequestEncoder.class);
p.replaceAndForward(HttpResponseDecoder.class,
"ws-decoder", new WebSocket00FrameDecoder(maxFramePayloadLength()));
}
private static String insertRandomCharacters(String key) {
@ -289,4 +250,14 @@ public class WebSocketClientHandshaker00 extends WebSocketClientHandshaker {
return key;
}
@Override
protected ChannelInboundByteHandler newWebsocketDecoder() {
return new WebSocket00FrameDecoder(maxFramePayloadLength());
}
@Override
protected ChannelOutboundMessageHandler<WebSocketFrame> newWebSocketEncoder() {
return new WebSocket00FrameEncoder();
}
}

View File

@ -15,11 +15,8 @@
*/
package io.netty.handler.codec.http.websocketx;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelOutboundMessageHandler;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
@ -27,8 +24,6 @@ import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpHeaders.Names;
import io.netty.handler.codec.http.HttpHeaders.Values;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.CharsetUtil;
@ -94,11 +89,9 @@ public class WebSocketClientHandshaker07 extends WebSocketClientHandshaker {
* Sec-WebSocket-Version: 7
* </pre>
*
* @param channel
* Channel into which we can write our request
*/
@Override
public ChannelFuture handshake(Channel channel, final ChannelPromise promise) {
protected FullHttpRequest newHandshakeRequest() {
// Get path
URI wsURL = uri();
String path = wsURL.getPath();
@ -151,27 +144,7 @@ public class WebSocketClientHandshaker07 extends WebSocketClientHandshaker {
if (customHeaders != null) {
headers.add(customHeaders);
}
channel.pipeline().get(HttpResponseDecoder.class).setSingleDecode(true);
ChannelFuture future = channel.write(request);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
ChannelPipeline p = future.channel().pipeline();
p.addAfter(
p.context(HttpRequestEncoder.class).name(),
"ws-encoder", new WebSocket07FrameEncoder(true));
if (future.isSuccess()) {
promise.setSuccess();
} else {
promise.setFailure(future.cause());
}
}
});
return promise;
return request;
}
/**
@ -187,14 +160,12 @@ public class WebSocketClientHandshaker07 extends WebSocketClientHandshaker {
* Sec-WebSocket-Protocol: chat
* </pre>
*
* @param channel
* Channel
* @param response
* HTTP response returned from the server for the request sent by beginOpeningHandshake00().
* @throws WebSocketHandshakeException
*/
@Override
public void finishHandshake(Channel channel, FullHttpResponse response) {
protected void verify(FullHttpResponse response) {
final HttpResponseStatus status = HttpResponseStatus.SWITCHING_PROTOCOLS;
final HttpHeaders headers = response.headers();
@ -217,15 +188,15 @@ public class WebSocketClientHandshaker07 extends WebSocketClientHandshaker {
throw new WebSocketHandshakeException(String.format(
"Invalid challenge. Actual: %s. Expected: %s", accept, expectedChallengeResponseString));
}
}
String subprotocol = headers.get(Names.SEC_WEBSOCKET_PROTOCOL);
setActualSubprotocol(subprotocol);
@Override
protected ChannelInboundByteHandler newWebsocketDecoder() {
return new WebSocket07FrameDecoder(false, allowExtensions, maxFramePayloadLength());
}
setHandshakeComplete();
ChannelPipeline p = channel.pipeline();
p.replaceAndForward(HttpResponseDecoder.class,
"ws-decoder",
new WebSocket07FrameDecoder(false, allowExtensions, maxFramePayloadLength()));
@Override
protected ChannelOutboundMessageHandler<WebSocketFrame> newWebSocketEncoder() {
return new WebSocket07FrameEncoder(true);
}
}

View File

@ -15,11 +15,8 @@
*/
package io.netty.handler.codec.http.websocketx;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelOutboundMessageHandler;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
@ -27,8 +24,6 @@ import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpHeaders.Names;
import io.netty.handler.codec.http.HttpHeaders.Values;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.CharsetUtil;
@ -94,11 +89,9 @@ public class WebSocketClientHandshaker08 extends WebSocketClientHandshaker {
* Sec-WebSocket-Version: 8
* </pre>
*
* @param channel
* Channel into which we can write our request
*/
@Override
public ChannelFuture handshake(Channel channel, final ChannelPromise promise) {
protected FullHttpRequest newHandshakeRequest() {
// Get path
URI wsURL = uri();
String path = wsURL.getPath();
@ -151,27 +144,7 @@ public class WebSocketClientHandshaker08 extends WebSocketClientHandshaker {
if (customHeaders != null) {
headers.add(customHeaders);
}
channel.pipeline().get(HttpResponseDecoder.class).setSingleDecode(true);
ChannelFuture future = channel.write(request);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
ChannelPipeline p = future.channel().pipeline();
p.addAfter(
p.context(HttpRequestEncoder.class).name(),
"ws-encoder", new WebSocket08FrameEncoder(true));
if (future.isSuccess()) {
promise.setSuccess();
} else {
promise.setFailure(future.cause());
}
}
});
return promise;
return request;
}
/**
@ -187,14 +160,12 @@ public class WebSocketClientHandshaker08 extends WebSocketClientHandshaker {
* Sec-WebSocket-Protocol: chat
* </pre>
*
* @param channel
* Channel
* @param response
* HTTP response returned from the server for the request sent by beginOpeningHandshake00().
* @throws WebSocketHandshakeException
*/
@Override
public void finishHandshake(Channel channel, FullHttpResponse response) {
protected void verify(FullHttpResponse response) {
final HttpResponseStatus status = HttpResponseStatus.SWITCHING_PROTOCOLS;
final HttpHeaders headers = response.headers();
@ -217,16 +188,15 @@ public class WebSocketClientHandshaker08 extends WebSocketClientHandshaker {
throw new WebSocketHandshakeException(String.format(
"Invalid challenge. Actual: %s. Expected: %s", accept, expectedChallengeResponseString));
}
}
String subprotocol = headers.get(Names.SEC_WEBSOCKET_PROTOCOL);
setActualSubprotocol(subprotocol);
@Override
protected ChannelInboundByteHandler newWebsocketDecoder() {
return new WebSocket08FrameDecoder(false, allowExtensions, maxFramePayloadLength());
}
setHandshakeComplete();
ChannelPipeline p = channel.pipeline();
p.remove(HttpRequestEncoder.class);
p.replaceAndForward(HttpResponseDecoder.class,
"ws-decoder",
new WebSocket08FrameDecoder(false, allowExtensions, maxFramePayloadLength()));
@Override
protected ChannelOutboundMessageHandler<WebSocketFrame> newWebSocketEncoder() {
return new WebSocket08FrameEncoder(true);
}
}

View File

@ -15,20 +15,15 @@
*/
package io.netty.handler.codec.http.websocketx;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelOutboundMessageHandler;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpHeaders.Names;
import io.netty.handler.codec.http.HttpHeaders.Values;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.CharsetUtil;
@ -94,11 +89,9 @@ public class WebSocketClientHandshaker13 extends WebSocketClientHandshaker {
* Sec-WebSocket-Version: 13
* </pre>
*
* @param channel
* Channel into which we can write our request
*/
@Override
public ChannelFuture handshake(Channel channel, final ChannelPromise promise) {
protected FullHttpRequest newHandshakeRequest() {
// Get path
URI wsURL = uri();
String path = wsURL.getPath();
@ -124,7 +117,7 @@ public class WebSocketClientHandshaker13 extends WebSocketClientHandshaker {
}
// Format request
HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, path);
FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, path);
HttpHeaders headers = request.headers();
headers.add(Names.UPGRADE, Values.WEBSOCKET.toLowerCase())
@ -151,26 +144,7 @@ public class WebSocketClientHandshaker13 extends WebSocketClientHandshaker {
if (customHeaders != null) {
headers.add(customHeaders);
}
channel.pipeline().get(HttpResponseDecoder.class).setSingleDecode(true);
ChannelFuture future = channel.write(request);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
ChannelPipeline p = future.channel().pipeline();
p.addAfter(
p.context(HttpRequestEncoder.class).name(),
"ws-encoder", new WebSocket13FrameEncoder(true));
if (future.isSuccess()) {
promise.setSuccess();
} else {
promise.setFailure(future.cause());
}
}
});
return promise;
return request;
}
/**
@ -186,14 +160,12 @@ public class WebSocketClientHandshaker13 extends WebSocketClientHandshaker {
* Sec-WebSocket-Protocol: chat
* </pre>
*
* @param channel
* Channel
* @param response
* HTTP response returned from the server for the request sent by beginOpeningHandshake00().
* @throws WebSocketHandshakeException
*/
@Override
public void finishHandshake(Channel channel, FullHttpResponse response) {
protected void verify(FullHttpResponse response) {
final HttpResponseStatus status = HttpResponseStatus.SWITCHING_PROTOCOLS;
final HttpHeaders headers = response.headers();
@ -216,16 +188,15 @@ public class WebSocketClientHandshaker13 extends WebSocketClientHandshaker {
throw new WebSocketHandshakeException(String.format(
"Invalid challenge. Actual: %s. Expected: %s", accept, expectedChallengeResponseString));
}
}
String subprotocol = headers.get(Names.SEC_WEBSOCKET_PROTOCOL);
setActualSubprotocol(subprotocol);
@Override
protected ChannelInboundByteHandler newWebsocketDecoder() {
return new WebSocket13FrameDecoder(false, allowExtensions, maxFramePayloadLength());
}
setHandshakeComplete();
ChannelPipeline p = channel.pipeline();
p.remove(HttpRequestEncoder.class);
p.replaceAndForward(HttpResponseDecoder.class,
"ws-decoder",
new WebSocket13FrameDecoder(false, allowExtensions, maxFramePayloadLength()));
@Override
protected ChannelOutboundMessageHandler<WebSocketFrame> newWebSocketEncoder() {
return new WebSocket13FrameEncoder(true);
}
}

View File

@ -17,10 +17,22 @@ package io.netty.handler.codec.http.websocketx;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelOutboundMessageHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.Collections;
import java.util.LinkedHashSet;
@ -30,6 +42,7 @@ import java.util.Set;
* Base class for server side web socket opening and closing handshakes
*/
public abstract class WebSocketServerHandshaker {
protected static final InternalLogger logger = InternalLoggerFactory.getInstance(WebSocketServerHandshaker.class);
private static final String[] EMPTY_ARRAY = new String[0];
@ -129,9 +142,51 @@ public abstract class WebSocketServerHandshaker {
* @param promise
* the {@link ChannelPromise} to be notified when the opening handshake is done
*/
public abstract ChannelFuture handshake(Channel channel, FullHttpRequest req,
HttpHeaders responseHeaders, ChannelPromise promise);
public final ChannelFuture handshake(Channel channel, FullHttpRequest req,
HttpHeaders responseHeaders, final ChannelPromise promise) {
if (logger.isDebugEnabled()) {
logger.debug(String.format("Channel %s WS Version %s server handshake", version(), channel.id()));
}
FullHttpResponse response = newHandshakeResponse(req, responseHeaders);
channel.write(response).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
ChannelPipeline p = future.channel().pipeline();
if (p.get(HttpObjectAggregator.class) != null) {
p.remove(HttpObjectAggregator.class);
}
ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class);
if (ctx == null) {
// this means the user use a HttpServerCodec
ctx = p.context(HttpServerCodec.class);
if (ctx == null) {
promise.setFailure(
new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline"));
return;
}
p.addBefore(ctx.name(), "wsencoder", newWebsocketDecoder());
p.replaceAndForward(ctx.name(), "wsdecoder", newWebSocketEncoder());
} else {
p.replaceAndForward(ctx.name(), "wsdecoder", newWebsocketDecoder());
p.replace(HttpResponseEncoder.class, "wsencoder", newWebSocketEncoder());
}
promise.setSuccess();
} else {
promise.setFailure(future.cause());
}
}
});
return promise;
}
/**
* Returns a new {@link FullHttpResponse) which will be used for as response to the handshake request.
*/
protected abstract FullHttpResponse newHandshakeResponse(FullHttpRequest req,
HttpHeaders responseHeaders);
/**
* Performs the closing handshake
*
@ -157,7 +212,9 @@ public abstract class WebSocketServerHandshaker {
* @param promise
* the {@link ChannelPromise} to be notified when the closing handshake is done
*/
public abstract ChannelFuture close(Channel channel, CloseWebSocketFrame frame, ChannelPromise promise);
public ChannelFuture close(Channel channel, CloseWebSocketFrame frame, ChannelPromise promise) {
return channel.write(frame, promise).addListener(ChannelFutureListener.CLOSE);
}
/**
* Selects the first matching supported sub protocol
@ -200,4 +257,14 @@ public abstract class WebSocketServerHandshaker {
selectedSubprotocol = value;
}
/**
* Returns the decoder to use after handshake is complete.
*/
protected abstract ChannelInboundByteHandler newWebsocketDecoder();
/**
* Returns the encoder to use after the handshake is complete.
*/
protected abstract ChannelOutboundMessageHandler<WebSocketFrame> newWebSocketEncoder();
}

View File

@ -19,8 +19,8 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelOutboundMessageHandler;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
@ -28,12 +28,7 @@ import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpHeaders.Names;
import io.netty.handler.codec.http.HttpHeaders.Values;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.regex.Pattern;
@ -53,8 +48,6 @@ import static io.netty.handler.codec.http.HttpVersion.*;
*/
public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(WebSocketServerHandshaker00.class);
private static final Pattern BEGINNING_DIGIT = Pattern.compile("[^0-9]");
private static final Pattern BEGINNING_SPACE = Pattern.compile("[^ ]");
@ -115,11 +108,7 @@ public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker {
* </pre>
*/
@Override
public ChannelFuture handshake(Channel channel, FullHttpRequest req, HttpHeaders headers, ChannelPromise promise) {
if (logger.isDebugEnabled()) {
logger.debug(String.format("Channel %s WS Version 00 server handshake", channel.id()));
}
protected FullHttpResponse newHandshakeResponse(FullHttpRequest req, HttpHeaders headers) {
// Serve the WebSocket handshake request.
if (!Values.UPGRADE.equalsIgnoreCase(req.headers().get(CONNECTION))
@ -178,24 +167,7 @@ public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker {
res.headers().add(WEBSOCKET_PROTOCOL, selectSubprotocol(protocol));
}
}
// Upgrade the connection and send the handshake response.
channel.write(res, promise);
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
ChannelPipeline p = future.channel().pipeline();
if (p.get(HttpObjectAggregator.class) != null) {
p.remove(HttpObjectAggregator.class);
}
p.replaceAndForward(HttpRequestDecoder.class, "wsdecoder",
new WebSocket00FrameDecoder(maxFramePayloadLength()));
p.replace(HttpResponseEncoder.class, "wsencoder", new WebSocket00FrameEncoder());
}
});
return promise;
return res;
}
/**
@ -210,4 +182,14 @@ public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker {
public ChannelFuture close(Channel channel, CloseWebSocketFrame frame, ChannelPromise promise) {
return channel.write(frame, promise);
}
@Override
protected ChannelInboundByteHandler newWebsocketDecoder() {
return new WebSocket00FrameDecoder(maxFramePayloadLength());
}
@Override
protected ChannelOutboundMessageHandler<WebSocketFrame> newWebSocketEncoder() {
return new WebSocket00FrameEncoder();
}
}

View File

@ -15,23 +15,15 @@
*/
package io.netty.handler.codec.http.websocketx;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelOutboundMessageHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpHeaders.Names;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.CharsetUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import static io.netty.handler.codec.http.HttpHeaders.Values.*;
import static io.netty.handler.codec.http.HttpVersion.*;
@ -45,8 +37,6 @@ import static io.netty.handler.codec.http.HttpVersion.*;
*/
public class WebSocketServerHandshaker07 extends WebSocketServerHandshaker {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(WebSocketServerHandshaker07.class);
public static final String WEBSOCKET_07_ACCEPT_GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
private final boolean allowExtensions;
@ -105,11 +95,7 @@ public class WebSocketServerHandshaker07 extends WebSocketServerHandshaker {
* </pre>
*/
@Override
public ChannelFuture handshake(Channel channel, FullHttpRequest req, HttpHeaders headers, ChannelPromise promise) {
if (logger.isDebugEnabled()) {
logger.debug(String.format("Channel %s WS Version 7 server handshake", channel.id()));
}
protected FullHttpResponse newHandshakeResponse(FullHttpRequest req, HttpHeaders headers) {
FullHttpResponse res =
new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.SWITCHING_PROTOCOLS);
@ -143,38 +129,16 @@ public class WebSocketServerHandshaker07 extends WebSocketServerHandshaker {
setSelectedSubprotocol(selectedSubprotocol);
}
}
channel.write(res, promise);
// Upgrade the connection and send the handshake response.
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
ChannelPipeline p = future.channel().pipeline();
if (p.get(HttpObjectAggregator.class) != null) {
p.remove(HttpObjectAggregator.class);
}
p.replaceAndForward(HttpRequestDecoder.class, "wsdecoder",
new WebSocket07FrameDecoder(true, allowExtensions, maxFramePayloadLength()));
p.replace(HttpResponseEncoder.class, "wsencoder", new WebSocket07FrameEncoder(false));
}
});
return promise;
return res;
}
/**
* Echo back the closing frame and close the connection
*
* @param channel
* Channel
* @param frame
* Web Socket frame that was received
*/
@Override
public ChannelFuture close(Channel channel, CloseWebSocketFrame frame, ChannelPromise future) {
future.addListener(ChannelFutureListener.CLOSE);
return channel.write(frame, future);
protected ChannelInboundByteHandler newWebsocketDecoder() {
return new WebSocket07FrameDecoder(true, allowExtensions, maxFramePayloadLength());
}
@Override
protected ChannelOutboundMessageHandler<WebSocketFrame> newWebSocketEncoder() {
return new WebSocket07FrameEncoder(false);
}
}

View File

@ -15,23 +15,15 @@
*/
package io.netty.handler.codec.http.websocketx;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelOutboundMessageHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpHeaders.Names;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.CharsetUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import static io.netty.handler.codec.http.HttpHeaders.Values.*;
import static io.netty.handler.codec.http.HttpVersion.*;
@ -45,8 +37,6 @@ import static io.netty.handler.codec.http.HttpVersion.*;
*/
public class WebSocketServerHandshaker08 extends WebSocketServerHandshaker {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(WebSocketServerHandshaker08.class);
public static final String WEBSOCKET_08_ACCEPT_GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
private final boolean allowExtensions;
@ -106,13 +96,8 @@ public class WebSocketServerHandshaker08 extends WebSocketServerHandshaker {
* </pre>
*/
@Override
public ChannelFuture handshake(Channel channel, FullHttpRequest req, HttpHeaders headers, ChannelPromise promise) {
if (logger.isDebugEnabled()) {
logger.debug(String.format("Channel %s WS Version 8 server handshake", channel.id()));
}
HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.SWITCHING_PROTOCOLS);
protected FullHttpResponse newHandshakeResponse(FullHttpRequest req, HttpHeaders headers) {
FullHttpResponse res = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.SWITCHING_PROTOCOLS);
if (headers != null) {
res.headers().add(headers);
@ -143,38 +128,16 @@ public class WebSocketServerHandshaker08 extends WebSocketServerHandshaker {
setSelectedSubprotocol(selectedSubprotocol);
}
}
channel.write(res, promise);
// Upgrade the connection and send the handshake response.
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
ChannelPipeline p = future.channel().pipeline();
if (p.get(HttpObjectAggregator.class) != null) {
p.remove(HttpObjectAggregator.class);
}
p.replaceAndForward(HttpRequestDecoder.class, "wsdecoder",
new WebSocket08FrameDecoder(true, allowExtensions, maxFramePayloadLength()));
p.replace(HttpResponseEncoder.class, "wsencoder", new WebSocket08FrameEncoder(false));
}
});
return promise;
return res;
}
/**
* Echo back the closing frame and close the connection
*
* @param channel
* Channel
* @param frame
* Web Socket frame that was received
*/
@Override
public ChannelFuture close(Channel channel, CloseWebSocketFrame frame, ChannelPromise promise) {
promise.addListener(ChannelFutureListener.CLOSE);
return channel.write(frame, promise);
protected ChannelInboundByteHandler newWebsocketDecoder() {
return new WebSocket08FrameDecoder(true, allowExtensions, maxFramePayloadLength());
}
@Override
protected ChannelOutboundMessageHandler<WebSocketFrame> newWebSocketEncoder() {
return new WebSocket08FrameEncoder(false);
}
}

View File

@ -15,23 +15,15 @@
*/
package io.netty.handler.codec.http.websocketx;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelOutboundMessageHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpHeaders.Names;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.CharsetUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import static io.netty.handler.codec.http.HttpHeaders.Values.*;
import static io.netty.handler.codec.http.HttpVersion.*;
@ -44,8 +36,6 @@ import static io.netty.handler.codec.http.HttpVersion.*;
*/
public class WebSocketServerHandshaker13 extends WebSocketServerHandshaker {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(WebSocketServerHandshaker13.class);
public static final String WEBSOCKET_13_ACCEPT_GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
private final boolean allowExtensions;
@ -105,13 +95,8 @@ public class WebSocketServerHandshaker13 extends WebSocketServerHandshaker {
* </pre>
*/
@Override
public ChannelFuture handshake(Channel channel, FullHttpRequest req, HttpHeaders headers, ChannelPromise promise) {
if (logger.isDebugEnabled()) {
logger.debug(String.format("Channel %s WS Version 13 server handshake", channel.id()));
}
HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.SWITCHING_PROTOCOLS);
protected FullHttpResponse newHandshakeResponse(FullHttpRequest req, HttpHeaders headers) {
FullHttpResponse res = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.SWITCHING_PROTOCOLS);
if (headers != null) {
res.headers().add(headers);
}
@ -142,38 +127,16 @@ public class WebSocketServerHandshaker13 extends WebSocketServerHandshaker {
setSelectedSubprotocol(selectedSubprotocol);
}
}
channel.write(res, promise);
// Upgrade the connection and send the handshake response.
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
ChannelPipeline p = future.channel().pipeline();
if (p.get(HttpObjectAggregator.class) != null) {
p.remove(HttpObjectAggregator.class);
}
p.replaceAndForward(HttpRequestDecoder.class, "wsdecoder",
new WebSocket13FrameDecoder(true, allowExtensions, maxFramePayloadLength()));
p.replace(HttpResponseEncoder.class, "wsencoder", new WebSocket13FrameEncoder(false));
}
});
return promise;
return res;
}
/**
* Echo back the closing frame and close the connection
*
* @param channel
* Channel
* @param frame
* Web Socket frame that was received
*/
@Override
public ChannelFuture close(Channel channel, CloseWebSocketFrame frame, ChannelPromise promise) {
promise.addListener(ChannelFutureListener.CLOSE);
return channel.write(frame, promise);
protected ChannelInboundByteHandler newWebsocketDecoder() {
return new WebSocket13FrameDecoder(true, allowExtensions, maxFramePayloadLength());
}
@Override
protected ChannelOutboundMessageHandler<WebSocketFrame> newWebSocketEncoder() {
return new WebSocket13FrameEncoder(false);
}
}

View File

@ -45,10 +45,9 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
@ -90,8 +89,7 @@ public class WebSocketClient {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new HttpResponseDecoder());
pipeline.addLast("encoder", new HttpRequestEncoder());
pipeline.addLast("http-codec", new HttpClientCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(8192));
pipeline.addLast("ws-handler", handler);
}

View File

@ -19,8 +19,7 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpServerCodec;
/**
*/
@ -28,9 +27,8 @@ public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("codec-http", new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("handler", new WebSocketServerHandler());
}
}