WebSocket enhancements
- Refactoring and adding suggestions from Norman and Vibul.
This commit is contained in:
parent
c6436ad470
commit
150e8b4105
@ -0,0 +1,99 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2012 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.handler.codec.http.websocketx;
|
||||||
|
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive;
|
||||||
|
import static io.netty.handler.codec.http.HttpMethod.GET;
|
||||||
|
import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
|
||||||
|
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
||||||
|
import io.netty.channel.ChannelFuture;
|
||||||
|
import io.netty.channel.ChannelFutureListener;
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
|
||||||
|
import io.netty.handler.codec.http.DefaultHttpResponse;
|
||||||
|
import io.netty.handler.codec.http.HttpHeaders;
|
||||||
|
import io.netty.handler.codec.http.HttpRequest;
|
||||||
|
import io.netty.handler.codec.http.HttpResponse;
|
||||||
|
import io.netty.logging.InternalLogger;
|
||||||
|
import io.netty.logging.InternalLoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles the HTTP handshake (the HTTP Upgrade request)
|
||||||
|
*/
|
||||||
|
class WebSocketServerHandshakeHandler extends ChannelInboundMessageHandlerAdapter<HttpRequest> {
|
||||||
|
|
||||||
|
private static final InternalLogger logger =
|
||||||
|
InternalLoggerFactory.getInstance(WebSocketServerHandshakeHandler.class);
|
||||||
|
private final String websocketPath;
|
||||||
|
private final String subprotocols;
|
||||||
|
private final boolean allowExtensions;
|
||||||
|
|
||||||
|
public WebSocketServerHandshakeHandler(String websocketPath, String subprotocols, boolean allowExtensions) {
|
||||||
|
this.websocketPath = websocketPath;
|
||||||
|
this.subprotocols = subprotocols;
|
||||||
|
this.allowExtensions = allowExtensions;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void messageReceived(final ChannelHandlerContext ctx, HttpRequest req) throws Exception {
|
||||||
|
if (req.getMethod() != GET) {
|
||||||
|
sendHttpResponse(ctx, req, new DefaultHttpResponse(HTTP_1_1, FORBIDDEN));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
|
||||||
|
getWebSocketLocation(req, websocketPath), subprotocols, allowExtensions);
|
||||||
|
final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req);
|
||||||
|
if (handshaker == null) {
|
||||||
|
wsFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
final ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req);
|
||||||
|
handshakeFuture.addListener(new ChannelFutureListener() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
|
if (!future.isSuccess()) {
|
||||||
|
ctx.fireExceptionCaught(future.cause());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
WebSocketServerProtocolHandler.setHandshaker(ctx, handshaker);
|
||||||
|
ctx.pipeline().replace(this, "WS403Responder",
|
||||||
|
WebSocketServerProtocolHandler.forbiddenHttpRequestResponder());
|
||||||
|
} catch (WebSocketHandshakeException e) {
|
||||||
|
ctx.fireExceptionCaught(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||||
|
logger.error("Exception Caught", cause);
|
||||||
|
ctx.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) {
|
||||||
|
ChannelFuture f = ctx.channel().write(res);
|
||||||
|
if (!isKeepAlive(req) || res.getStatus().getCode() != 200) {
|
||||||
|
f.addListener(ChannelFutureListener.CLOSE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String getWebSocketLocation(HttpRequest req, String path) {
|
||||||
|
return "ws://" + req.getHeader(HttpHeaders.Names.HOST) + path;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,112 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2012 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.handler.codec.http.websocketx;
|
||||||
|
|
||||||
|
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
|
import io.netty.channel.ChannelHandler;
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
|
||||||
|
import io.netty.handler.codec.http.DefaultHttpResponse;
|
||||||
|
import io.netty.handler.codec.http.HttpResponseStatus;
|
||||||
|
import io.netty.util.AttributeKey;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles WebSocket control frames (Close, Ping, Pong) and data frames (Text and Binary) are passed
|
||||||
|
* to the next handler in the pipeline.
|
||||||
|
*/
|
||||||
|
public class WebSocketServerProtocolHandler extends ChannelInboundMessageHandlerAdapter<WebSocketFrame> {
|
||||||
|
|
||||||
|
private static final AttributeKey<WebSocketServerHandshaker> HANDSHAKER_ATTR_KEY =
|
||||||
|
new AttributeKey<WebSocketServerHandshaker>(WebSocketServerHandshaker.class.getName());
|
||||||
|
|
||||||
|
private final String websocketPath;
|
||||||
|
private final String subprotocols;
|
||||||
|
private final boolean allowExtensions;
|
||||||
|
|
||||||
|
public WebSocketServerProtocolHandler(String websocketPath) {
|
||||||
|
this(websocketPath, null, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public WebSocketServerProtocolHandler(String websocketPath, String subprotocols) {
|
||||||
|
this(websocketPath, subprotocols, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public WebSocketServerProtocolHandler(String websocketPath, String subprotocols, boolean allowExtensions) {
|
||||||
|
this.websocketPath = websocketPath;
|
||||||
|
this.subprotocols = subprotocols;
|
||||||
|
this.allowExtensions = allowExtensions;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterAdd(ChannelHandlerContext ctx) {
|
||||||
|
// Add the WebSocketHandshakeHandler before this one.
|
||||||
|
ctx.pipeline().addBefore(ctx.name(), WebSocketServerHandshakeHandler.class.getName(),
|
||||||
|
new WebSocketServerHandshakeHandler(websocketPath, subprotocols, allowExtensions));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void messageReceived(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
|
||||||
|
if (frame instanceof CloseWebSocketFrame) {
|
||||||
|
WebSocketServerHandshaker handshaker = WebSocketServerProtocolHandler.getHandshaker(ctx);
|
||||||
|
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
|
||||||
|
return;
|
||||||
|
} else if (frame instanceof PingWebSocketFrame) {
|
||||||
|
ctx.channel().write(new PongWebSocketFrame(frame.getBinaryData()));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx.nextInboundMessageBuffer().add(frame);
|
||||||
|
ctx.fireInboundBufferUpdated();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||||
|
try {
|
||||||
|
if (cause instanceof WebSocketHandshakeException) {
|
||||||
|
DefaultHttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.BAD_REQUEST);
|
||||||
|
response.setContent(Unpooled.wrappedBuffer(cause.getMessage().getBytes()));
|
||||||
|
ctx.channel().write(response);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
ctx.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static WebSocketServerHandshaker getHandshaker(ChannelHandlerContext ctx) {
|
||||||
|
return ctx.attr(HANDSHAKER_ATTR_KEY).get();
|
||||||
|
}
|
||||||
|
|
||||||
|
static void setHandshaker(ChannelHandlerContext ctx, WebSocketServerHandshaker handshaker) {
|
||||||
|
ctx.attr(HANDSHAKER_ATTR_KEY).set(handshaker);
|
||||||
|
}
|
||||||
|
|
||||||
|
static ChannelHandler forbiddenHttpRequestResponder() {
|
||||||
|
return new ChannelInboundMessageHandlerAdapter<Object>() {
|
||||||
|
@Override
|
||||||
|
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||||
|
if (!(msg instanceof WebSocketFrame)) {
|
||||||
|
DefaultHttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.FORBIDDEN);
|
||||||
|
ctx.channel().write(response);
|
||||||
|
} else {
|
||||||
|
ctx.nextInboundMessageBuffer().add(msg);
|
||||||
|
ctx.fireInboundBufferUpdated();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,131 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2012 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License, version
|
||||||
|
* 2.0 (the "License"); you may not use this file except in compliance with the
|
||||||
|
* License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package io.netty.handler.codec.http.websocketx;
|
||||||
|
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaders.Values.WEBSOCKET;
|
||||||
|
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
||||||
|
import io.netty.handler.codec.http.DefaultHttpRequest;
|
||||||
|
import io.netty.handler.codec.http.HttpMethod;
|
||||||
|
import io.netty.handler.codec.http.HttpRequest;
|
||||||
|
import io.netty.handler.codec.http.HttpVersion;
|
||||||
|
import io.netty.handler.codec.http.HttpHeaders.Names;
|
||||||
|
|
||||||
|
public class WebSocketRequestBuilder {
|
||||||
|
|
||||||
|
private HttpVersion httpVersion;
|
||||||
|
private HttpMethod method;
|
||||||
|
private String uri;
|
||||||
|
private String host;
|
||||||
|
private String upgrade;
|
||||||
|
private String connection;
|
||||||
|
private String key;
|
||||||
|
private String origin;
|
||||||
|
private WebSocketVersion version;
|
||||||
|
|
||||||
|
public WebSocketRequestBuilder httpVersion(HttpVersion httpVersion) {
|
||||||
|
this.httpVersion = httpVersion;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public WebSocketRequestBuilder method(HttpMethod method) {
|
||||||
|
this.method = method;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public WebSocketRequestBuilder uri(String uri) {
|
||||||
|
this.uri = uri;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public WebSocketRequestBuilder host(String host) {
|
||||||
|
this.host = host;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public WebSocketRequestBuilder upgrade(String upgrade) {
|
||||||
|
this.upgrade = upgrade;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public WebSocketRequestBuilder connection(String connection) {
|
||||||
|
this.connection = connection;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public WebSocketRequestBuilder key(String key) {
|
||||||
|
this.key = key;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public WebSocketRequestBuilder origin(String origin) {
|
||||||
|
this.origin = origin;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public WebSocketRequestBuilder version13() {
|
||||||
|
this.version = WebSocketVersion.V13;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public WebSocketRequestBuilder version8() {
|
||||||
|
this.version = WebSocketVersion.V08;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public WebSocketRequestBuilder version00() {
|
||||||
|
this.version = null;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public WebSocketRequestBuilder noVersion() {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public HttpRequest build() {
|
||||||
|
HttpRequest req = new DefaultHttpRequest(httpVersion, method, uri);
|
||||||
|
if (host != null) {
|
||||||
|
req.setHeader(Names.HOST, host);
|
||||||
|
}
|
||||||
|
if (upgrade != null) {
|
||||||
|
req.setHeader(Names.UPGRADE, upgrade);
|
||||||
|
}
|
||||||
|
if (connection != null) {
|
||||||
|
req.setHeader(Names.CONNECTION, connection);
|
||||||
|
}
|
||||||
|
if (key != null) {
|
||||||
|
req.setHeader(Names.SEC_WEBSOCKET_KEY, key);
|
||||||
|
}
|
||||||
|
if (origin != null) {
|
||||||
|
req.setHeader(Names.SEC_WEBSOCKET_ORIGIN, origin);
|
||||||
|
}
|
||||||
|
if (version != null) {
|
||||||
|
req.setHeader(Names.SEC_WEBSOCKET_VERSION, version.toHttpHeaderValue());
|
||||||
|
}
|
||||||
|
return req;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static HttpRequest sucessful() {
|
||||||
|
return new WebSocketRequestBuilder().httpVersion(HTTP_1_1)
|
||||||
|
.method(HttpMethod.GET)
|
||||||
|
.uri("/test")
|
||||||
|
.host("server.example.com")
|
||||||
|
.upgrade(WEBSOCKET.toLowerCase())
|
||||||
|
.key("dGhlIHNhbXBsZSBub25jZQ==")
|
||||||
|
.origin("http://example.com")
|
||||||
|
.version13()
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,164 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2012 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License, version
|
||||||
|
* 2.0 (the "License"); you may not use this file except in compliance with the
|
||||||
|
* License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package io.netty.handler.codec.http.websocketx;
|
||||||
|
|
||||||
|
import static io.netty.handler.codec.http.HttpHeaders.Values.WEBSOCKET;
|
||||||
|
import static io.netty.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS;
|
||||||
|
import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
|
||||||
|
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import io.netty.buffer.MessageBuf;
|
||||||
|
import io.netty.channel.ChannelFuture;
|
||||||
|
import io.netty.channel.ChannelHandler;
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
|
||||||
|
import io.netty.channel.ChannelOutboundMessageHandlerAdapter;
|
||||||
|
import io.netty.channel.embedded.EmbeddedMessageChannel;
|
||||||
|
import io.netty.handler.codec.http.DefaultHttpRequest;
|
||||||
|
import io.netty.handler.codec.http.HttpMethod;
|
||||||
|
import io.netty.handler.codec.http.HttpRequest;
|
||||||
|
import io.netty.handler.codec.http.HttpRequestDecoder;
|
||||||
|
import io.netty.handler.codec.http.HttpResponse;
|
||||||
|
import io.netty.handler.codec.http.HttpResponseEncoder;
|
||||||
|
import io.netty.handler.codec.http.HttpResponseStatus;
|
||||||
|
import io.netty.handler.codec.http.HttpVersion;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class WebSocketServerProtocolHandlerTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHttpUpgradeRequest() throws Exception {
|
||||||
|
EmbeddedMessageChannel ch = createChannel(new MockOutboundHandler());
|
||||||
|
ChannelHandlerContext handshakerCtx = ch.pipeline().context(WebSocketServerHandshakeHandler.class);
|
||||||
|
|
||||||
|
writeUpgradeRequest(ch);
|
||||||
|
|
||||||
|
assertEquals(SWITCHING_PROTOCOLS, ((HttpResponse) ch.outboundMessageBuffer().poll()).getStatus());
|
||||||
|
assertNotNull(WebSocketServerProtocolHandler.getHandshaker(handshakerCtx));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSubsequentHttpRequestsAfterUpgradeShouldReturn403() throws Exception {
|
||||||
|
EmbeddedMessageChannel ch = createChannel(new MockOutboundHandler());
|
||||||
|
|
||||||
|
writeUpgradeRequest(ch);
|
||||||
|
assertEquals(SWITCHING_PROTOCOLS, ((HttpResponse) ch.outboundMessageBuffer().poll()).getStatus());
|
||||||
|
|
||||||
|
ch.writeInbound(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/test"));
|
||||||
|
assertEquals(FORBIDDEN, ((HttpResponse) ch.outboundMessageBuffer().poll()).getStatus());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHttpUpgradeRequestInvalidUpgradeHeader() {
|
||||||
|
EmbeddedMessageChannel ch = createChannel();
|
||||||
|
HttpRequest httpRequest = new WebSocketRequestBuilder().httpVersion(HTTP_1_1)
|
||||||
|
.method(HttpMethod.GET)
|
||||||
|
.uri("/test")
|
||||||
|
.connection("Upgrade")
|
||||||
|
.version00()
|
||||||
|
.upgrade("BogusSocket")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
ch.writeInbound(httpRequest);
|
||||||
|
|
||||||
|
HttpResponse response = getHttpResponse(ch);
|
||||||
|
assertEquals(HttpResponseStatus.BAD_REQUEST, response.getStatus());
|
||||||
|
assertEquals("not a WebSocket handshake request: missing upgrade", getResponseMessage(response));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHttpUpgradeRequestMissingWSKeyHeader() {
|
||||||
|
EmbeddedMessageChannel ch = createChannel();
|
||||||
|
HttpRequest httpRequest = new WebSocketRequestBuilder().httpVersion(HTTP_1_1)
|
||||||
|
.method(HttpMethod.GET)
|
||||||
|
.uri("/test")
|
||||||
|
.key(null)
|
||||||
|
.connection("Upgrade")
|
||||||
|
.upgrade(WEBSOCKET.toLowerCase())
|
||||||
|
.version13()
|
||||||
|
.build();
|
||||||
|
|
||||||
|
ch.writeInbound(httpRequest);
|
||||||
|
|
||||||
|
HttpResponse response = getHttpResponse(ch);
|
||||||
|
assertEquals(HttpResponseStatus.BAD_REQUEST, response.getStatus());
|
||||||
|
assertEquals("not a WebSocket request: missing key", getResponseMessage(response));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHandleTextFrame() {
|
||||||
|
CustomTextFrameHandler customTextFrameHandler = new CustomTextFrameHandler();
|
||||||
|
EmbeddedMessageChannel ch = createChannel(customTextFrameHandler);
|
||||||
|
writeUpgradeRequest(ch);
|
||||||
|
// Removing the HttpRequestDecoder as we are writing a TextWebSocketFrame so decoding is not neccessary.
|
||||||
|
ch.pipeline().remove(HttpRequestDecoder.class);
|
||||||
|
|
||||||
|
ch.writeInbound(new TextWebSocketFrame("payload"));
|
||||||
|
|
||||||
|
assertEquals("processed: payload", customTextFrameHandler.getContent());
|
||||||
|
}
|
||||||
|
|
||||||
|
private EmbeddedMessageChannel createChannel() {
|
||||||
|
return createChannel(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private EmbeddedMessageChannel createChannel(ChannelHandler handler) {
|
||||||
|
return new EmbeddedMessageChannel(
|
||||||
|
new WebSocketServerProtocolHandler("/test", null, false),
|
||||||
|
new HttpRequestDecoder(),
|
||||||
|
new HttpResponseEncoder(),
|
||||||
|
new MockOutboundHandler(),
|
||||||
|
handler);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeUpgradeRequest(EmbeddedMessageChannel ch) {
|
||||||
|
ch.writeInbound(WebSocketRequestBuilder.sucessful());
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getResponseMessage(HttpResponse response) {
|
||||||
|
return new String(response.getContent().array());
|
||||||
|
}
|
||||||
|
|
||||||
|
private HttpResponse getHttpResponse(EmbeddedMessageChannel ch) {
|
||||||
|
MessageBuf<Object> outbound = ch.pipeline().context(MockOutboundHandler.class).outboundMessageBuffer();
|
||||||
|
return (HttpResponse) outbound.poll();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class MockOutboundHandler extends ChannelOutboundMessageHandlerAdapter<Object> {
|
||||||
|
@Override
|
||||||
|
public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
|
||||||
|
//NoOp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class CustomTextFrameHandler extends ChannelInboundMessageHandlerAdapter<TextWebSocketFrame> {
|
||||||
|
private String content;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void messageReceived(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
|
||||||
|
content = "processed: " + msg.getText();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getContent() {
|
||||||
|
return content;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user