More robust pipeline manipulation while upgrading to WebSocket

- This commit allows a user to write its first web socket frame right after calling WebSocketServerHandshaker.handshake() rather than adding a listener to the future it returns.
- Should fix #1933
This commit is contained in:
Trustin Lee 2013-10-18 19:35:38 +09:00
parent 68e3b52a2a
commit 3b324e9515
6 changed files with 55 additions and 99 deletions

View File

@ -15,13 +15,13 @@
*/
package org.jboss.netty.handler.codec.http.websocketx;
import java.net.URI;
import java.util.Map;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.handler.codec.http.HttpResponse;
import java.net.URI;
import java.util.Map;
/**
* Base class for web socket client handshake implementations
*/
@ -31,7 +31,7 @@ public abstract class WebSocketClientHandshaker {
private final WebSocketVersion version;
private boolean handshakeComplete;
private volatile boolean handshakeComplete;
private final String expectedSubprotocol;

View File

@ -18,9 +18,15 @@ package org.jboss.netty.handler.codec.http.websocketx;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.util.internal.StringUtil;
import java.util.Collections;
@ -148,6 +154,30 @@ public abstract class WebSocketServerHandshaker {
*/
public abstract ChannelFuture handshake(Channel channel, HttpRequest req);
/**
* Upgrades the connection and send the handshake response.
*/
protected ChannelFuture writeHandshakeResponse(
Channel channel, HttpResponse res, ChannelHandler encoder, ChannelHandler decoder) {
final ChannelPipeline p = channel.getPipeline();
if (p.get(HttpChunkAggregator.class) != null) {
p.remove(HttpChunkAggregator.class);
}
final String httpEncoderName = p.getContext(HttpResponseEncoder.class).getName();
p.addAfter(httpEncoderName, "wsencoder", encoder);
p.get(HttpRequestDecoder.class).replace("wsdecoder", decoder);
final ChannelFuture future = channel.write(res);
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
p.remove(httpEncoderName);
}
});
return future;
}
/**
* Performs the closing handshake
*

View File

@ -15,28 +15,23 @@
*/
package org.jboss.netty.handler.codec.http.websocketx;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.*;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.*;
import static org.jboss.netty.handler.codec.http.HttpVersion.*;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
import org.jboss.netty.handler.codec.http.HttpHeaders.Names;
import org.jboss.netty.handler.codec.http.HttpHeaders.Values;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.*;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.*;
import static org.jboss.netty.handler.codec.http.HttpVersion.*;
/**
* <p>
* Performs server side opening and closing handshakes for web socket specification version <a
@ -184,23 +179,8 @@ public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker {
}
}
ChannelFuture future = channel.write(res);
// Upgrade the connection and send the handshake response.
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
ChannelPipeline p = future.getChannel().getPipeline();
if (p.get(HttpChunkAggregator.class) != null) {
p.remove(HttpChunkAggregator.class);
}
p.get(HttpRequestDecoder.class).replace("wsdecoder",
new WebSocket00FrameDecoder(getMaxFramePayloadLength()));
p.replace(HttpResponseEncoder.class, "wsencoder", new WebSocket00FrameEncoder());
}
});
return future;
return writeHandshakeResponse(
channel, res, new WebSocket00FrameEncoder(), new WebSocket00FrameDecoder(getMaxFramePayloadLength()));
}
/**

View File

@ -20,14 +20,10 @@ import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
import org.jboss.netty.handler.codec.http.HttpHeaders.Names;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
@ -144,23 +140,9 @@ public class WebSocketServerHandshaker07 extends WebSocketServerHandshaker {
}
}
ChannelFuture future = channel.write(res);
// Upgrade the connection and send the handshake response.
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
ChannelPipeline p = future.getChannel().getPipeline();
if (p.get(HttpChunkAggregator.class) != null) {
p.remove(HttpChunkAggregator.class);
}
p.get(HttpRequestDecoder.class).replace("wsdecoder",
new WebSocket07FrameDecoder(true, allowExtensions, getMaxFramePayloadLength()));
p.replace(HttpResponseEncoder.class, "wsencoder", new WebSocket07FrameEncoder(false));
}
});
return future;
return writeHandshakeResponse(
channel, res, new WebSocket07FrameEncoder(false),
new WebSocket07FrameDecoder(true, allowExtensions, getMaxFramePayloadLength()));
}
/**

View File

@ -15,27 +15,23 @@
*/
package org.jboss.netty.handler.codec.http.websocketx;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.*;
import static org.jboss.netty.handler.codec.http.HttpVersion.*;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
import org.jboss.netty.handler.codec.http.HttpHeaders.Names;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.CharsetUtil;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.*;
import static org.jboss.netty.handler.codec.http.HttpVersion.*;
/**
* <p>
* Performs server side opening and closing handshakes for web socket specification version <a
@ -162,23 +158,9 @@ public class WebSocketServerHandshaker08 extends WebSocketServerHandshaker {
}
}
ChannelFuture future = channel.write(res);
// Upgrade the connection and send the handshake response.
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
ChannelPipeline p = future.getChannel().getPipeline();
if (p.get(HttpChunkAggregator.class) != null) {
p.remove(HttpChunkAggregator.class);
}
p.get(HttpRequestDecoder.class).replace("wsdecoder",
new WebSocket08FrameDecoder(true, allowExtensions, getMaxFramePayloadLength()));
p.replace(HttpResponseEncoder.class, "wsencoder", new WebSocket08FrameEncoder(false));
}
});
return future;
return writeHandshakeResponse(
channel, res, new WebSocket08FrameEncoder(false),
new WebSocket08FrameDecoder(true, allowExtensions, getMaxFramePayloadLength()));
}
/**

View File

@ -15,27 +15,23 @@
*/
package org.jboss.netty.handler.codec.http.websocketx;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.*;
import static org.jboss.netty.handler.codec.http.HttpVersion.*;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
import org.jboss.netty.handler.codec.http.HttpHeaders.Names;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.CharsetUtil;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.*;
import static org.jboss.netty.handler.codec.http.HttpVersion.*;
/**
* <p>
* Performs server side opening and closing handshakes for <a
@ -168,23 +164,9 @@ public class WebSocketServerHandshaker13 extends WebSocketServerHandshaker {
}
}
ChannelFuture future = channel.write(res);
// Upgrade the connection and send the handshake response.
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
ChannelPipeline p = future.getChannel().getPipeline();
if (p.get(HttpChunkAggregator.class) != null) {
p.remove(HttpChunkAggregator.class);
}
p.get(HttpRequestDecoder.class).replace("wsdecoder",
new WebSocket13FrameDecoder(true, allowExtensions, getMaxFramePayloadLength()));
p.replace(HttpResponseEncoder.class, "wsencoder", new WebSocket13FrameEncoder(false));
}
});
return future;
return writeHandshakeResponse(
channel, res, new WebSocket13FrameEncoder(false),
new WebSocket13FrameDecoder(true, allowExtensions, getMaxFramePayloadLength()));
}
/**