More robust pipeline manipulation while upgrading to WebSocket

- This commit allows a user to write its first web socket frame right after calling WebSocketServerHandshaker.handshake() rather than adding a listener to the future it returns.
- Should fix #1933
This commit is contained in:
Norman Maurer 2013-10-18 18:22:20 +02:00
parent 3a01bf1064
commit 3367e51882

View File

@ -159,11 +159,7 @@ public abstract class WebSocketServerHandshaker {
logger.debug(String.format("%s WS Version %s server handshake", channel, version())); logger.debug(String.format("%s WS Version %s server handshake", channel, version()));
} }
FullHttpResponse response = newHandshakeResponse(req, responseHeaders); FullHttpResponse response = newHandshakeResponse(req, responseHeaders);
channel.writeAndFlush(response).addListener(new ChannelFutureListener() { ChannelPipeline p = channel.pipeline();
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
ChannelPipeline p = future.channel().pipeline();
if (p.get(HttpObjectAggregator.class) != null) { if (p.get(HttpObjectAggregator.class) != null) {
p.remove(HttpObjectAggregator.class); p.remove(HttpObjectAggregator.class);
} }
@ -171,21 +167,30 @@ public abstract class WebSocketServerHandshaker {
p.remove(HttpContentCompressor.class); p.remove(HttpContentCompressor.class);
} }
ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class); ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class);
final String encoderName;
if (ctx == null) { if (ctx == null) {
// this means the user use a HttpServerCodec // this means the user use a HttpServerCodec
ctx = p.context(HttpServerCodec.class); ctx = p.context(HttpServerCodec.class);
if (ctx == null) { if (ctx == null) {
promise.setFailure( promise.setFailure(
new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline")); new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline"));
return; return promise;
} }
p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder()); p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());
p.replace(ctx.name(), "wsencoder", newWebSocketEncoder()); p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());
encoderName = ctx.name();
} else { } else {
p.replace(ctx.name(), "wsdecoder", newWebsocketDecoder()); p.replace(ctx.name(), "wsdecoder", newWebsocketDecoder());
p.replace(HttpResponseEncoder.class, "wsencoder", newWebSocketEncoder()); encoderName = p.context(HttpResponseEncoder.class).name();
p.addAfter(encoderName, "wsencoder", newWebSocketEncoder());
} }
channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
ChannelPipeline p = future.channel().pipeline();
p.remove(encoderName);
promise.setSuccess(); promise.setSuccess();
} else { } else {
promise.setFailure(future.cause()); promise.setFailure(future.cause());