diff --git a/src/main/java/org/jboss/netty/handler/codec/spdy/SpdyHttpEncoder.java b/src/main/java/org/jboss/netty/handler/codec/spdy/SpdyHttpEncoder.java index fef1ed7a3a..d6f2571a8f 100644 --- a/src/main/java/org/jboss/netty/handler/codec/spdy/SpdyHttpEncoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/spdy/SpdyHttpEncoder.java @@ -15,12 +15,14 @@ */ package org.jboss.netty.handler.codec.spdy; +import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.ChannelDownstreamHandler; import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.DownstreamMessageEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.handler.codec.http.HttpChunk; import org.jboss.netty.handler.codec.http.HttpChunkTrailer; @@ -29,6 +31,7 @@ import org.jboss.netty.handler.codec.http.HttpMessage; import org.jboss.netty.handler.codec.http.HttpRequest; import org.jboss.netty.handler.codec.http.HttpResponse; +import java.net.SocketAddress; import java.util.List; import java.util.Map; @@ -126,7 +129,7 @@ import static org.jboss.netty.handler.codec.spdy.SpdyCodecUtil.*; public class SpdyHttpEncoder implements ChannelDownstreamHandler { private final int spdyVersion; - private volatile int currentStreamID; + private volatile int currentStreamId; /** * Creates a new instance. @@ -155,8 +158,8 @@ public class SpdyHttpEncoder implements ChannelDownstreamHandler { HttpRequest httpRequest = (HttpRequest) msg; SpdySynStreamFrame spdySynStreamFrame = createSynStreamFrame(httpRequest); - int streamID = spdySynStreamFrame.getStreamId(); - ChannelFuture future = getContentFuture(ctx, e, streamID, httpRequest); + currentStreamId = spdySynStreamFrame.getStreamId(); + ChannelFuture future = getMessageFuture(ctx, e, currentStreamId, httpRequest); Channels.write(ctx, future, spdySynStreamFrame, e.getRemoteAddress()); } else if (msg instanceof HttpResponse) { @@ -164,82 +167,106 @@ public class SpdyHttpEncoder implements ChannelDownstreamHandler { HttpResponse httpResponse = (HttpResponse) msg; if (httpResponse.containsHeader(SpdyHttpHeaders.Names.ASSOCIATED_TO_STREAM_ID)) { SpdySynStreamFrame spdySynStreamFrame = createSynStreamFrame(httpResponse); - int streamID = spdySynStreamFrame.getStreamId(); - ChannelFuture future = getContentFuture(ctx, e, streamID, httpResponse); + currentStreamId = spdySynStreamFrame.getStreamId(); + ChannelFuture future = getMessageFuture(ctx, e, currentStreamId, httpResponse); Channels.write(ctx, future, spdySynStreamFrame, e.getRemoteAddress()); } else { SpdySynReplyFrame spdySynReplyFrame = createSynReplyFrame(httpResponse); - int streamID = spdySynReplyFrame.getStreamId(); - ChannelFuture future = getContentFuture(ctx, e, streamID, httpResponse); + currentStreamId = spdySynReplyFrame.getStreamId(); + ChannelFuture future = getMessageFuture(ctx, e, currentStreamId, httpResponse); Channels.write(ctx, future, spdySynReplyFrame, e.getRemoteAddress()); } } else if (msg instanceof HttpChunk) { HttpChunk chunk = (HttpChunk) msg; - SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(currentStreamID); - spdyDataFrame.setData(chunk.getContent()); - spdyDataFrame.setLast(chunk.isLast()); - - if (chunk instanceof HttpChunkTrailer) { - HttpChunkTrailer trailer = (HttpChunkTrailer) chunk; - List> trailers = trailer.getHeaders(); - if (trailers.isEmpty()) { - Channels.write(ctx, e.getFuture(), spdyDataFrame, e.getRemoteAddress()); - } else { - // Create SPDY HEADERS frame out of trailers - SpdyHeadersFrame spdyHeadersFrame = new DefaultSpdyHeadersFrame(currentStreamID); - for (Map.Entry entry: trailers) { - spdyHeadersFrame.addHeader(entry.getKey(), entry.getValue()); - } - - // Write HEADERS frame and append Data Frame - ChannelFuture future = Channels.future(e.getChannel()); - future.addListener(new SpdyFrameWriter(ctx, e, spdyDataFrame)); - Channels.write(ctx, future, spdyHeadersFrame, e.getRemoteAddress()); - } - } else { - Channels.write(ctx, e.getFuture(), spdyDataFrame, e.getRemoteAddress()); - } + writeChunk(ctx, e.getFuture(), currentStreamId, chunk, e.getRemoteAddress()); } else { // Unknown message type ctx.sendDownstream(evt); } } - private static ChannelFuture getContentFuture( - ChannelHandlerContext ctx, MessageEvent e, int streamID, HttpMessage httpMessage) { - if (httpMessage.getContent().readableBytes() == 0) { + /** + * Writes an HTTP chunk downstream as one or more SPDY frames. + */ + protected void writeChunk( + ChannelHandlerContext ctx, ChannelFuture future, + int streamId, HttpChunk chunk, SocketAddress remoteAddress) { + + if (chunk.isLast()) { + if (chunk instanceof HttpChunkTrailer) { + HttpChunkTrailer trailer = (HttpChunkTrailer) chunk; + List> trailers = trailer.getHeaders(); + if (trailers.isEmpty()) { + SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(streamId); + spdyDataFrame.setLast(true); + Channels.write(ctx, future, spdyDataFrame, remoteAddress); + } else { + // Create SPDY HEADERS frame out of trailers + SpdyHeadersFrame spdyHeadersFrame = new DefaultSpdyHeadersFrame(streamId); + spdyHeadersFrame.setLast(true); + for (Map.Entry entry: trailers) { + spdyHeadersFrame.addHeader(entry.getKey(), entry.getValue()); + } + Channels.write(ctx, future, spdyHeadersFrame, remoteAddress); + } + } else { + SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(streamId); + spdyDataFrame.setLast(true); + Channels.write(ctx, future, spdyDataFrame, remoteAddress); + } + } else { + SpdyDataFrame[] spdyDataFrames = createSpdyDataFrames(streamId, chunk.getContent()); + ChannelFuture dataFuture = getDataFuture(ctx, future, spdyDataFrames, remoteAddress); + + // Trigger a write + dataFuture.setSuccess(); + } + } + + private ChannelFuture getMessageFuture( + ChannelHandlerContext ctx, MessageEvent e, int streamId, HttpMessage httpMessage) { + if (!httpMessage.getContent().readable()) { return e.getFuture(); } - // Create SPDY Data Frame out of message content - SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(streamID); - spdyDataFrame.setData(httpMessage.getContent()); - spdyDataFrame.setLast(true); + // Create SPDY Data Frames out of message content + SpdyDataFrame[] spdyDataFrames = createSpdyDataFrames(streamId, httpMessage.getContent()); + if (spdyDataFrames.length > 0) { + spdyDataFrames[spdyDataFrames.length - 1].setLast(true); + } - // Create new future and add listener - ChannelFuture future = Channels.future(e.getChannel()); - future.addListener(new SpdyFrameWriter(ctx, e, spdyDataFrame)); + return getDataFuture(ctx, e.getFuture(), spdyDataFrames, e.getRemoteAddress()); + } - return future; + private static ChannelFuture getDataFuture( + ChannelHandlerContext ctx, ChannelFuture future, + SpdyDataFrame[] spdyDataFrames, SocketAddress remoteAddress) { + + ChannelFuture dataFuture = future; + for (int i = spdyDataFrames.length; --i >= 0;) { + future = Channels.future(ctx.getChannel()); + future.addListener(new SpdyFrameWriter(ctx, new DownstreamMessageEvent( + ctx.getChannel(), dataFuture, spdyDataFrames[i], remoteAddress))); + dataFuture = future; + } + return dataFuture; } private static class SpdyFrameWriter implements ChannelFutureListener { private final ChannelHandlerContext ctx; private final MessageEvent e; - private final Object spdyFrame; - SpdyFrameWriter(ChannelHandlerContext ctx, MessageEvent e, Object spdyFrame) { + SpdyFrameWriter(ChannelHandlerContext ctx, MessageEvent e) { this.ctx = ctx; this.e = e; - this.spdyFrame = spdyFrame; } public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { - Channels.write(ctx, e.getFuture(), spdyFrame, e.getRemoteAddress()); + ctx.sendDownstream(e); } else if (future.isCancelled()) { e.getFuture().cancel(); } else { @@ -253,8 +280,8 @@ public class SpdyHttpEncoder implements ChannelDownstreamHandler { boolean chunked = httpMessage.isChunked(); // Get the Stream-ID, Associated-To-Stream-ID, Priority, URL, and scheme from the headers - int streamID = SpdyHttpHeaders.getStreamId(httpMessage); - int associatedToStreamID = SpdyHttpHeaders.getAssociatedToStreamId(httpMessage); + int streamId = SpdyHttpHeaders.getStreamId(httpMessage); + int associatedToStreamId = SpdyHttpHeaders.getAssociatedToStreamId(httpMessage); byte priority = SpdyHttpHeaders.getPriority(httpMessage); String URL = SpdyHttpHeaders.getUrl(httpMessage); String scheme = SpdyHttpHeaders.getScheme(httpMessage); @@ -272,7 +299,8 @@ public class SpdyHttpEncoder implements ChannelDownstreamHandler { httpMessage.removeHeader(HttpHeaders.Names.TRANSFER_ENCODING); SpdySynStreamFrame spdySynStreamFrame = - new DefaultSpdySynStreamFrame(streamID, associatedToStreamID, priority); + new DefaultSpdySynStreamFrame(streamId, associatedToStreamId, priority); + spdySynStreamFrame.setLast(!chunked && !httpMessage.getContent().readable()); // Unfold the first line of the message into name/value pairs if (httpMessage instanceof HttpRequest) { @@ -307,13 +335,6 @@ public class SpdyHttpEncoder implements ChannelDownstreamHandler { spdySynStreamFrame.addHeader(entry.getKey(), entry.getValue()); } - if (chunked) { - currentStreamID = streamID; - spdySynStreamFrame.setLast(false); - } else { - spdySynStreamFrame.setLast(httpMessage.getContent().readableBytes() == 0); - } - return spdySynStreamFrame; } @@ -322,7 +343,7 @@ public class SpdyHttpEncoder implements ChannelDownstreamHandler { boolean chunked = httpResponse.isChunked(); // Get the Stream-ID from the headers - int streamID = SpdyHttpHeaders.getStreamId(httpResponse); + int streamId = SpdyHttpHeaders.getStreamId(httpResponse); SpdyHttpHeaders.removeStreamId(httpResponse); // The Connection, Keep-Alive, Proxy-Connection, and Transfer-Encoding @@ -332,7 +353,8 @@ public class SpdyHttpEncoder implements ChannelDownstreamHandler { httpResponse.removeHeader("Proxy-Connection"); httpResponse.removeHeader(HttpHeaders.Names.TRANSFER_ENCODING); - SpdySynReplyFrame spdySynReplyFrame = new DefaultSpdySynReplyFrame(streamID); + SpdySynReplyFrame spdySynReplyFrame = new DefaultSpdySynReplyFrame(streamId); + spdySynReplyFrame.setLast(!chunked && !httpResponse.getContent().readable()); // Unfold the first line of the response into name/value pairs SpdyHeaders.setStatus(spdyVersion, spdySynReplyFrame, httpResponse.getStatus()); @@ -343,13 +365,22 @@ public class SpdyHttpEncoder implements ChannelDownstreamHandler { spdySynReplyFrame.addHeader(entry.getKey(), entry.getValue()); } - if (chunked) { - currentStreamID = streamID; - spdySynReplyFrame.setLast(false); - } else { - spdySynReplyFrame.setLast(httpResponse.getContent().readableBytes() == 0); - } - return spdySynReplyFrame; } + + private SpdyDataFrame[] createSpdyDataFrames(int streamId, ChannelBuffer content) { + int readableBytes = content.readableBytes(); + int count = readableBytes / SPDY_MAX_LENGTH; + if (readableBytes % SPDY_MAX_LENGTH > 0) { + count++; + } + SpdyDataFrame[] spdyDataFrames = new SpdyDataFrame[count]; + for (int i = 0; i < count; i ++) { + SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(streamId); + int dataSize = Math.min(content.readableBytes(), SPDY_MAX_LENGTH); + spdyDataFrame.setData(content.readSlice(dataSize)); + spdyDataFrames[i] = spdyDataFrame; + } + return spdyDataFrames; + } }