SPDY: refactor SpdyHttpEncoder

This commit is contained in:
Jeff Pinner 2013-06-06 19:25:50 -07:00 committed by Trustin Lee
parent 91ee0e5a56
commit af59aa6ccb

View File

@ -15,12 +15,14 @@
*/
package org.jboss.netty.handler.codec.spdy;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.DownstreamMessageEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpChunkTrailer;
@ -29,6 +31,7 @@ import org.jboss.netty.handler.codec.http.HttpMessage;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponse;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
@ -126,7 +129,7 @@ import static org.jboss.netty.handler.codec.spdy.SpdyCodecUtil.*;
public class SpdyHttpEncoder implements ChannelDownstreamHandler {
private final int spdyVersion;
private volatile int currentStreamID;
private volatile int currentStreamId;
/**
* Creates a new instance.
@ -155,8 +158,8 @@ public class SpdyHttpEncoder implements ChannelDownstreamHandler {
HttpRequest httpRequest = (HttpRequest) msg;
SpdySynStreamFrame spdySynStreamFrame = createSynStreamFrame(httpRequest);
int streamID = spdySynStreamFrame.getStreamId();
ChannelFuture future = getContentFuture(ctx, e, streamID, httpRequest);
currentStreamId = spdySynStreamFrame.getStreamId();
ChannelFuture future = getMessageFuture(ctx, e, currentStreamId, httpRequest);
Channels.write(ctx, future, spdySynStreamFrame, e.getRemoteAddress());
} else if (msg instanceof HttpResponse) {
@ -164,82 +167,106 @@ public class SpdyHttpEncoder implements ChannelDownstreamHandler {
HttpResponse httpResponse = (HttpResponse) msg;
if (httpResponse.containsHeader(SpdyHttpHeaders.Names.ASSOCIATED_TO_STREAM_ID)) {
SpdySynStreamFrame spdySynStreamFrame = createSynStreamFrame(httpResponse);
int streamID = spdySynStreamFrame.getStreamId();
ChannelFuture future = getContentFuture(ctx, e, streamID, httpResponse);
currentStreamId = spdySynStreamFrame.getStreamId();
ChannelFuture future = getMessageFuture(ctx, e, currentStreamId, httpResponse);
Channels.write(ctx, future, spdySynStreamFrame, e.getRemoteAddress());
} else {
SpdySynReplyFrame spdySynReplyFrame = createSynReplyFrame(httpResponse);
int streamID = spdySynReplyFrame.getStreamId();
ChannelFuture future = getContentFuture(ctx, e, streamID, httpResponse);
currentStreamId = spdySynReplyFrame.getStreamId();
ChannelFuture future = getMessageFuture(ctx, e, currentStreamId, httpResponse);
Channels.write(ctx, future, spdySynReplyFrame, e.getRemoteAddress());
}
} else if (msg instanceof HttpChunk) {
HttpChunk chunk = (HttpChunk) msg;
SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(currentStreamID);
spdyDataFrame.setData(chunk.getContent());
spdyDataFrame.setLast(chunk.isLast());
if (chunk instanceof HttpChunkTrailer) {
HttpChunkTrailer trailer = (HttpChunkTrailer) chunk;
List<Map.Entry<String, String>> trailers = trailer.getHeaders();
if (trailers.isEmpty()) {
Channels.write(ctx, e.getFuture(), spdyDataFrame, e.getRemoteAddress());
} else {
// Create SPDY HEADERS frame out of trailers
SpdyHeadersFrame spdyHeadersFrame = new DefaultSpdyHeadersFrame(currentStreamID);
for (Map.Entry<String, String> entry: trailers) {
spdyHeadersFrame.addHeader(entry.getKey(), entry.getValue());
}
// Write HEADERS frame and append Data Frame
ChannelFuture future = Channels.future(e.getChannel());
future.addListener(new SpdyFrameWriter(ctx, e, spdyDataFrame));
Channels.write(ctx, future, spdyHeadersFrame, e.getRemoteAddress());
}
} else {
Channels.write(ctx, e.getFuture(), spdyDataFrame, e.getRemoteAddress());
}
writeChunk(ctx, e.getFuture(), currentStreamId, chunk, e.getRemoteAddress());
} else {
// Unknown message type
ctx.sendDownstream(evt);
}
}
private static ChannelFuture getContentFuture(
ChannelHandlerContext ctx, MessageEvent e, int streamID, HttpMessage httpMessage) {
if (httpMessage.getContent().readableBytes() == 0) {
/**
* Writes an HTTP chunk downstream as one or more SPDY frames.
*/
protected void writeChunk(
ChannelHandlerContext ctx, ChannelFuture future,
int streamId, HttpChunk chunk, SocketAddress remoteAddress) {
if (chunk.isLast()) {
if (chunk instanceof HttpChunkTrailer) {
HttpChunkTrailer trailer = (HttpChunkTrailer) chunk;
List<Map.Entry<String, String>> trailers = trailer.getHeaders();
if (trailers.isEmpty()) {
SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(streamId);
spdyDataFrame.setLast(true);
Channels.write(ctx, future, spdyDataFrame, remoteAddress);
} else {
// Create SPDY HEADERS frame out of trailers
SpdyHeadersFrame spdyHeadersFrame = new DefaultSpdyHeadersFrame(streamId);
spdyHeadersFrame.setLast(true);
for (Map.Entry<String, String> entry: trailers) {
spdyHeadersFrame.addHeader(entry.getKey(), entry.getValue());
}
Channels.write(ctx, future, spdyHeadersFrame, remoteAddress);
}
} else {
SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(streamId);
spdyDataFrame.setLast(true);
Channels.write(ctx, future, spdyDataFrame, remoteAddress);
}
} else {
SpdyDataFrame[] spdyDataFrames = createSpdyDataFrames(streamId, chunk.getContent());
ChannelFuture dataFuture = getDataFuture(ctx, future, spdyDataFrames, remoteAddress);
// Trigger a write
dataFuture.setSuccess();
}
}
private ChannelFuture getMessageFuture(
ChannelHandlerContext ctx, MessageEvent e, int streamId, HttpMessage httpMessage) {
if (!httpMessage.getContent().readable()) {
return e.getFuture();
}
// Create SPDY Data Frame out of message content
SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(streamID);
spdyDataFrame.setData(httpMessage.getContent());
spdyDataFrame.setLast(true);
// Create SPDY Data Frames out of message content
SpdyDataFrame[] spdyDataFrames = createSpdyDataFrames(streamId, httpMessage.getContent());
if (spdyDataFrames.length > 0) {
spdyDataFrames[spdyDataFrames.length - 1].setLast(true);
}
// Create new future and add listener
ChannelFuture future = Channels.future(e.getChannel());
future.addListener(new SpdyFrameWriter(ctx, e, spdyDataFrame));
return getDataFuture(ctx, e.getFuture(), spdyDataFrames, e.getRemoteAddress());
}
return future;
private static ChannelFuture getDataFuture(
ChannelHandlerContext ctx, ChannelFuture future,
SpdyDataFrame[] spdyDataFrames, SocketAddress remoteAddress) {
ChannelFuture dataFuture = future;
for (int i = spdyDataFrames.length; --i >= 0;) {
future = Channels.future(ctx.getChannel());
future.addListener(new SpdyFrameWriter(ctx, new DownstreamMessageEvent(
ctx.getChannel(), dataFuture, spdyDataFrames[i], remoteAddress)));
dataFuture = future;
}
return dataFuture;
}
private static class SpdyFrameWriter implements ChannelFutureListener {
private final ChannelHandlerContext ctx;
private final MessageEvent e;
private final Object spdyFrame;
SpdyFrameWriter(ChannelHandlerContext ctx, MessageEvent e, Object spdyFrame) {
SpdyFrameWriter(ChannelHandlerContext ctx, MessageEvent e) {
this.ctx = ctx;
this.e = e;
this.spdyFrame = spdyFrame;
}
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
Channels.write(ctx, e.getFuture(), spdyFrame, e.getRemoteAddress());
ctx.sendDownstream(e);
} else if (future.isCancelled()) {
e.getFuture().cancel();
} else {
@ -253,8 +280,8 @@ public class SpdyHttpEncoder implements ChannelDownstreamHandler {
boolean chunked = httpMessage.isChunked();
// Get the Stream-ID, Associated-To-Stream-ID, Priority, URL, and scheme from the headers
int streamID = SpdyHttpHeaders.getStreamId(httpMessage);
int associatedToStreamID = SpdyHttpHeaders.getAssociatedToStreamId(httpMessage);
int streamId = SpdyHttpHeaders.getStreamId(httpMessage);
int associatedToStreamId = SpdyHttpHeaders.getAssociatedToStreamId(httpMessage);
byte priority = SpdyHttpHeaders.getPriority(httpMessage);
String URL = SpdyHttpHeaders.getUrl(httpMessage);
String scheme = SpdyHttpHeaders.getScheme(httpMessage);
@ -272,7 +299,8 @@ public class SpdyHttpEncoder implements ChannelDownstreamHandler {
httpMessage.removeHeader(HttpHeaders.Names.TRANSFER_ENCODING);
SpdySynStreamFrame spdySynStreamFrame =
new DefaultSpdySynStreamFrame(streamID, associatedToStreamID, priority);
new DefaultSpdySynStreamFrame(streamId, associatedToStreamId, priority);
spdySynStreamFrame.setLast(!chunked && !httpMessage.getContent().readable());
// Unfold the first line of the message into name/value pairs
if (httpMessage instanceof HttpRequest) {
@ -307,13 +335,6 @@ public class SpdyHttpEncoder implements ChannelDownstreamHandler {
spdySynStreamFrame.addHeader(entry.getKey(), entry.getValue());
}
if (chunked) {
currentStreamID = streamID;
spdySynStreamFrame.setLast(false);
} else {
spdySynStreamFrame.setLast(httpMessage.getContent().readableBytes() == 0);
}
return spdySynStreamFrame;
}
@ -322,7 +343,7 @@ public class SpdyHttpEncoder implements ChannelDownstreamHandler {
boolean chunked = httpResponse.isChunked();
// Get the Stream-ID from the headers
int streamID = SpdyHttpHeaders.getStreamId(httpResponse);
int streamId = SpdyHttpHeaders.getStreamId(httpResponse);
SpdyHttpHeaders.removeStreamId(httpResponse);
// The Connection, Keep-Alive, Proxy-Connection, and Transfer-Encoding
@ -332,7 +353,8 @@ public class SpdyHttpEncoder implements ChannelDownstreamHandler {
httpResponse.removeHeader("Proxy-Connection");
httpResponse.removeHeader(HttpHeaders.Names.TRANSFER_ENCODING);
SpdySynReplyFrame spdySynReplyFrame = new DefaultSpdySynReplyFrame(streamID);
SpdySynReplyFrame spdySynReplyFrame = new DefaultSpdySynReplyFrame(streamId);
spdySynReplyFrame.setLast(!chunked && !httpResponse.getContent().readable());
// Unfold the first line of the response into name/value pairs
SpdyHeaders.setStatus(spdyVersion, spdySynReplyFrame, httpResponse.getStatus());
@ -343,13 +365,22 @@ public class SpdyHttpEncoder implements ChannelDownstreamHandler {
spdySynReplyFrame.addHeader(entry.getKey(), entry.getValue());
}
if (chunked) {
currentStreamID = streamID;
spdySynReplyFrame.setLast(false);
} else {
spdySynReplyFrame.setLast(httpResponse.getContent().readableBytes() == 0);
}
return spdySynReplyFrame;
}
private SpdyDataFrame[] createSpdyDataFrames(int streamId, ChannelBuffer content) {
int readableBytes = content.readableBytes();
int count = readableBytes / SPDY_MAX_LENGTH;
if (readableBytes % SPDY_MAX_LENGTH > 0) {
count++;
}
SpdyDataFrame[] spdyDataFrames = new SpdyDataFrame[count];
for (int i = 0; i < count; i ++) {
SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(streamId);
int dataSize = Math.min(content.readableBytes(), SPDY_MAX_LENGTH);
spdyDataFrame.setData(content.readSlice(dataSize));
spdyDataFrames[i] = spdyDataFrame;
}
return spdyDataFrames;
}
}