Used org.jboss.netty.handler.codec.http instead of custom decoder for maximum compatibility

This commit is contained in:
Trustin Lee 2009-06-30 10:03:41 +00:00
parent 307c00e073
commit 012066cfa9
2 changed files with 90 additions and 73 deletions

View File

@ -25,7 +25,6 @@ package org.jboss.netty.channel.socket.http;
import static org.jboss.netty.channel.Channels.*;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
@ -62,7 +61,7 @@ final class HttpTunnelWorker implements Runnable {
}
}
byte[] buf;
ChannelBuffer buf;
try {
buf = channel.receiveChunk();
}
@ -74,11 +73,7 @@ final class HttpTunnelWorker implements Runnable {
}
if (buf != null) {
fireMessageReceived(
channel,
ChannelBuffers.wrappedBuffer(
channel.getConfig().getBufferFactory().getDefaultOrder(),
buf));
fireMessageReceived(channel, buf);
}
}

View File

@ -22,8 +22,7 @@
*/
package org.jboss.netty.channel.socket.http;
import static org.jboss.netty.channel.Channels.fireChannelOpen;
import static org.jboss.netty.channel.Channels.fireExceptionCaught;
import static org.jboss.netty.channel.Channels.*;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@ -37,7 +36,6 @@ import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
@ -51,7 +49,19 @@ import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.SocketChannel;
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import org.jboss.netty.handler.codec.http.Cookie;
import org.jboss.netty.handler.codec.http.CookieDecoder;
import org.jboss.netty.handler.codec.http.CookieEncoder;
import org.jboss.netty.handler.codec.http.DefaultHttpChunk;
import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpRequestEncoder;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseDecoder;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.ssl.SslHandler;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
@ -68,6 +78,8 @@ class HttpTunnelingClientSocketChannel extends AbstractChannel
static final InternalLogger logger =
InternalLoggerFactory.getInstance(HttpTunnelingClientSocketChannel.class);
private static final String JSESSIONID = "JSESSIONID";
private final HttpTunnelingSocketChannelConfig config;
private final Lock reconnectLock = new ReentrantLock();
@ -83,13 +95,12 @@ class HttpTunnelingClientSocketChannel extends AbstractChannel
volatile boolean closed = false;
final BlockingQueue<byte[]> messages = new LinkedTransferQueue<byte[]>();
final BlockingQueue<ChannelBuffer> messages = new LinkedTransferQueue<ChannelBuffer>();
private final ClientSocketChannelFactory clientSocketChannelFactory;
volatile SocketChannel channel;
private final DelimiterBasedFrameDecoder decoder = new DelimiterBasedFrameDecoder(8092, ChannelBuffers.wrappedBuffer(new byte[] { '\r', '\n' }));
private final HttpTunnelingClientSocketChannel.ServletChannelHandler handler = new ServletChannelHandler();
volatile HttpTunnelAddress remoteAddress;
@ -184,26 +195,27 @@ class HttpTunnelingClientSocketChannel extends AbstractChannel
}
// Send the HTTP request.
StringBuilder builder = new StringBuilder();
builder.append("POST ").append(url.getRawPath()).append(" HTTP/1.1").append(HttpTunnelingClientSocketPipelineSink.LINE_TERMINATOR).
append("Host: ").append(url.getHost()).append(":").append(url.getPort()).append(HttpTunnelingClientSocketPipelineSink.LINE_TERMINATOR).
append("Content-Type: application/octet-stream").append(HttpTunnelingClientSocketPipelineSink.LINE_TERMINATOR).append("Transfer-Encoding: chunked").
append(HttpTunnelingClientSocketPipelineSink.LINE_TERMINATOR).append("Content-Transfer-Encoding: Binary").append(HttpTunnelingClientSocketPipelineSink.LINE_TERMINATOR).append("Connection: Keep-Alive").
append(HttpTunnelingClientSocketPipelineSink.LINE_TERMINATOR);
if (reconnect) {
builder.append("Cookie: JSESSIONID=").append(sessionId).append(HttpTunnelingClientSocketPipelineSink.LINE_TERMINATOR);
HttpRequest req = new DefaultHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.POST, url.getRawPath());
req.setHeader(HttpHeaders.Names.HOST, url.getHost());
req.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/octet-stream");
req.setHeader(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED);
req.setHeader(HttpHeaders.Names.CONTENT_TRANSFER_ENCODING, HttpHeaders.Values.BINARY);
if (sessionId != null) {
CookieEncoder ce = new CookieEncoder(false);
ce.addCookie(JSESSIONID, sessionId);
req.setHeader(HttpHeaders.Names.COOKIE, ce.encode());
}
builder.append(HttpTunnelingClientSocketPipelineSink.LINE_TERMINATOR);
String msg = builder.toString();
channel.write(ChannelBuffers.copiedBuffer(msg, "ASCII"));
channel.write(req);
}
/**
*
*/
private void createSocketChannel() {
DefaultChannelPipeline channelPipeline = new DefaultChannelPipeline();
channelPipeline.addLast("decoder", decoder);
// TODO Expose the codec options via HttpTunnelingSocketChannelConfig
channelPipeline.addLast("decoder", new HttpResponseDecoder());
channelPipeline.addLast("encoder", new HttpRequestEncoder());
channelPipeline.addLast("handler", handler);
channel = clientSocketChannelFactory.newChannel(channelPipeline);
}
@ -213,18 +225,14 @@ class HttpTunnelingClientSocketChannel extends AbstractChannel
String hex = Integer.toHexString(size) + HttpTunnelingClientSocketPipelineSink.LINE_TERMINATOR;
synchronized (writeLock) {
ChannelFuture future = channel.write(ChannelBuffers.wrappedBuffer(
ChannelBuffers.copiedBuffer(hex, "ASCII"),
a,
ChannelBuffers.copiedBuffer(HttpTunnelingClientSocketPipelineSink.LINE_TERMINATOR, "ASCII")));
future.awaitUninterruptibly();
channel.write(new DefaultHttpChunk(a)).awaitUninterruptibly();
}
return size + hex.length() + HttpTunnelingClientSocketPipelineSink.LINE_TERMINATOR.length();
}
byte[] receiveChunk() {
byte[] buf = null;
ChannelBuffer receiveChunk() {
ChannelBuffer buf = null;
try {
buf = messages.take();
}
@ -259,12 +267,7 @@ class HttpTunnelingClientSocketChannel extends AbstractChannel
if (setClosed()) {
// Send the end of chunk.
synchronized (writeLock) {
ChannelFuture future = channel.write(ChannelBuffers.copiedBuffer(
"0" +
HttpTunnelingClientSocketPipelineSink.LINE_TERMINATOR +
HttpTunnelingClientSocketPipelineSink.LINE_TERMINATOR,
"ASCII"));
future.awaitUninterruptibly();
channel.write(HttpChunk.LAST_CHUNK).awaitUninterruptibly();
}
closed = true;
@ -278,49 +281,52 @@ class HttpTunnelingClientSocketChannel extends AbstractChannel
@ChannelPipelineCoverage("one")
class ServletChannelHandler extends SimpleChannelUpstreamHandler {
private volatile boolean readingChunks;
int nextChunkSize = -1;
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
ChannelBuffer buf = (ChannelBuffer) e.getMessage();
byte[] bytes = new byte[buf.readableBytes()];
buf.getBytes(0, bytes);
if (awaitingInitialResponse) {
// XXX: This is fragile - should use HTTP codec.
if (!readingChunks) {
HttpResponse res = (HttpResponse) e.getMessage();
String newSessionId = null;
newSessionId = getSessionId(res, HttpHeaders.Names.SET_COOKIE);
if (newSessionId == null) {
newSessionId = getSessionId(res, HttpHeaders.Names.SET_COOKIE2);
}
if (newSessionId == null) {
// XXX: Server does not support JSESSIONID?
}
// XXX: Utilize keep-alive if possible to reduce reconnection overhead.
// XXX: Consider non-200 status code.
// If the status code is not 200, no more reconnection attempt
// should be made.
String line = new String(bytes);
if (line.contains("Set-Cookie")) {
// XXX: Session ID length can be different between containers.
// XXX: If the session ID in the response is different from
// the session ID specified in the request, then it means
// the session has timed out. If so, channel must be closed.
int start = line.indexOf("JSESSIONID=") + 11;
int end = line.indexOf(";", start);
sessionId = line.substring(start, end);
}
else if (line.equals("")) {
awaitingInitialResponse = false;
}
}
else {
if(nextChunkSize == -1) {
String hex = new String(bytes);
nextChunkSize = Integer.parseInt(hex, 16);
if(nextChunkSize == 0) {
if(!closed) {
nextChunkSize = -1;
awaitingInitialResponse = true;
reconnect();
}
// XXX: If the session ID in the response is different from
// the session ID specified in the request, then it means
// the session has timed out. If so, channel must be closed.
sessionId = newSessionId;
if (res.isChunked()) {
readingChunks = true;
} else {
ChannelBuffer content = res.getContent();
if (content.readable()) {
System.out.println("1: " + content.toString("ISO-8859-1"));
messages.offer(content);
}
}
else {
messages.put(bytes);
nextChunkSize = -1;
} else {
HttpChunk chunk = (HttpChunk) e.getMessage();
if (!chunk.isLast()) {
System.out.println("2: " + chunk.getContent());
messages.offer(chunk.getContent());
} else {
readingChunks = false;
}
}
}
@ -332,4 +338,20 @@ class HttpTunnelingClientSocketChannel extends AbstractChannel
channel.close();
}
}
static String getSessionId(HttpResponse res, String headerName) {
CookieDecoder decoder = null;
for (String v: res.getHeaders(headerName)) {
if (decoder == null) {
decoder = new CookieDecoder();
}
for (Cookie c: decoder.decode(v)) {
if (c.getName().equals(JSESSIONID)) {
return c.getValue();
}
}
}
return null;
}
}