From 832ed0c19839472364f84761055b1351d340ac64 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Mon, 9 Mar 2009 08:50:24 +0000 Subject: [PATCH] Fixed NETTY-130 (Chunked encoding emulation for a large non-chunked HTTP request) * Replaced mergeChunks option with maxChunkSize * if maxChunkSize is greater than 0 and any content or chunk larger than maxChunkSize is received, it's split into multiple chunks as if a chunked request is received. * Added unfold option to FrameDecoder and ReplayingDecoder --- .../http/HttpServerPipelineFactory.java | 2 +- .../handler/codec/frame/FrameDecoder.java | 39 +++++- .../codec/http/DefaultHttpMessage.java | 3 +- .../netty/handler/codec/http/HttpChunk.java | 4 + .../codec/http/HttpMessageDecoder.java | 129 ++++++++++++++++-- .../codec/http/HttpRequestDecoder.java | 4 +- .../codec/http/HttpResponseDecoder.java | 4 +- .../codec/replay/ReplayingDecoder.java | 38 +++++- 8 files changed, 195 insertions(+), 28 deletions(-) diff --git a/src/main/java/org/jboss/netty/example/http/HttpServerPipelineFactory.java b/src/main/java/org/jboss/netty/example/http/HttpServerPipelineFactory.java index 3b7fddb265..e3d950a204 100644 --- a/src/main/java/org/jboss/netty/example/http/HttpServerPipelineFactory.java +++ b/src/main/java/org/jboss/netty/example/http/HttpServerPipelineFactory.java @@ -44,7 +44,7 @@ public class HttpServerPipelineFactory implements ChannelPipelineFactory { // Create a default pipeline implementation. ChannelPipeline pipeline = pipeline(); - pipeline.addLast("decoder", new HttpRequestDecoder(false)); + pipeline.addLast("decoder", new HttpRequestDecoder(8192)); pipeline.addLast("encoder", new HttpResponseEncoder()); pipeline.addLast("handler", handler); return pipeline; diff --git a/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java b/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java index 70a763fc07..802004030d 100644 --- a/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java @@ -22,8 +22,7 @@ */ package org.jboss.netty.handler.codec.frame; -import static org.jboss.netty.channel.Channels.*; - +import java.lang.reflect.Array; import java.net.SocketAddress; import java.util.concurrent.atomic.AtomicReference; @@ -35,6 +34,7 @@ import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipelineCoverage; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ChannelUpstreamHandler; +import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.LifeCycleAwareChannelHandler; import org.jboss.netty.channel.MessageEvent; @@ -151,9 +151,17 @@ import org.jboss.netty.channel.SimpleChannelHandler; public abstract class FrameDecoder extends SimpleChannelHandler implements LifeCycleAwareChannelHandler { + private final boolean unfold; private final AtomicReference cumulation = new AtomicReference(); + protected FrameDecoder() { + this(false); + } + + protected FrameDecoder(boolean unfold) { + this.unfold = unfold; + } @Override public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) @@ -276,7 +284,30 @@ public abstract class FrameDecoder "if it returned a frame."); } - fireMessageReceived(context, frame, remoteAddress); + fireMessageReceived(context, remoteAddress, frame); + } + } + + private void fireMessageReceived(ChannelHandlerContext context, SocketAddress remoteAddress, Object result) { + if (unfold) { + if (result instanceof Object[]) { + for (Object r: (Object[]) result) { + Channels.fireMessageReceived(context, r, remoteAddress); + } + } else if (result.getClass().isArray()){ + int length = Array.getLength(result); + for (int i = 0; i < length; i ++) { + Channels.fireMessageReceived(context, Array.get(result, i), remoteAddress); + } + } else if (result instanceof Iterable) { + for (Object r: (Iterable) result) { + Channels.fireMessageReceived(context, r, remoteAddress); + } + } else { + Channels.fireMessageReceived(context, result, remoteAddress); + } + } else { + Channels.fireMessageReceived(context, result, remoteAddress); } } @@ -291,7 +322,7 @@ public abstract class FrameDecoder // and send the remainders too if necessary. Object partialFrame = decodeLast(ctx, ctx.getChannel(), cumulation); if (partialFrame != null) { - fireMessageReceived(ctx, partialFrame, null); + fireMessageReceived(ctx, null, partialFrame); } } } diff --git a/src/main/java/org/jboss/netty/handler/codec/http/DefaultHttpMessage.java b/src/main/java/org/jboss/netty/handler/codec/http/DefaultHttpMessage.java index 2106581460..47fd1804a7 100644 --- a/src/main/java/org/jboss/netty/handler/codec/http/DefaultHttpMessage.java +++ b/src/main/java/org/jboss/netty/handler/codec/http/DefaultHttpMessage.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.TreeMap; import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.util.CaseIgnoringComparator; /** @@ -42,7 +43,7 @@ public class DefaultHttpMessage implements HttpMessage { private final HttpVersion version; private final Map> headers = new TreeMap>(CaseIgnoringComparator.INSTANCE); - private ChannelBuffer content; + private ChannelBuffer content = ChannelBuffers.EMPTY_BUFFER; protected DefaultHttpMessage(final HttpVersion version) { this.version = version; diff --git a/src/main/java/org/jboss/netty/handler/codec/http/HttpChunk.java b/src/main/java/org/jboss/netty/handler/codec/http/HttpChunk.java index 3ddffeaf7e..89b180578a 100644 --- a/src/main/java/org/jboss/netty/handler/codec/http/HttpChunk.java +++ b/src/main/java/org/jboss/netty/handler/codec/http/HttpChunk.java @@ -23,6 +23,7 @@ package org.jboss.netty.handler.codec.http; import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; /** * @author The Netty Project (netty-dev@lists.jboss.org) @@ -30,6 +31,9 @@ import org.jboss.netty.buffer.ChannelBuffer; * @version $Rev$, $Date$ */ public interface HttpChunk { + + static HttpChunk LAST_CHUNK = new DefaultHttpChunk(ChannelBuffers.EMPTY_BUFFER); + boolean isLast(); ChannelBuffer getContent(); } diff --git a/src/main/java/org/jboss/netty/handler/codec/http/HttpMessageDecoder.java b/src/main/java/org/jboss/netty/handler/codec/http/HttpMessageDecoder.java index b9b6f56bd4..872c98aac9 100644 --- a/src/main/java/org/jboss/netty/handler/codec/http/HttpMessageDecoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/http/HttpMessageDecoder.java @@ -21,8 +21,6 @@ */ package org.jboss.netty.handler.codec.http; -import static org.jboss.netty.buffer.ChannelBuffers.*; - import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -48,7 +46,7 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder 0; } @Override @@ -100,18 +110,39 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder maxChunkSize && nextState == State.READ_FIXED_LENGTH_CONTENT) { + // Generate HttpMessage first. HttpChunks will follow. + checkpoint(State.READ_FIXED_LENGTH_CONTENT_AS_CHUNKS); + message.addHeader(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED); + // chunkSize will be decreased as the READ_FIXED_LENGTH_CONTENT_AS_CHUNKS + // state reads data chunk by chunk. + chunkSize = message.getContentLength(-1); + return message; + } else if (nextState == State.READ_VARIABLE_LENGTH_CONTENT) { + // Generate HttpMessage first. HttpChunks will follow. + checkpoint(State.READ_VARIABLE_LENGTH_CONTENT_AS_CHUNKS); + message.addHeader(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED); + return message; + } + } } - //we return null here, this forces decode to be called again where we will decode the content + // We return null here, this forces decode to be called again where we will decode the content return null; } case READ_VARIABLE_LENGTH_CONTENT: { @@ -122,36 +153,103 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder maxChunkSize) { + chunk = new DefaultHttpChunk(buffer.readBytes(maxChunkSize)); + chunkSize -= maxChunkSize; + } else { + chunk = new DefaultHttpChunk(buffer.readBytes(chunkSize)); + chunkSize = 0; + } + this.chunkSize = chunkSize; + + if (chunkSize == 0) { + // Read all content. + reset(); + if (!chunk.isLast()) { + // Append the last chunk. + return new HttpChunk[] { chunk, HttpChunk.LAST_CHUNK }; + } + } + return chunk; + } /** * everything else after this point takes care of reading chunked content. basically, read chunk size, * read chunk, read and ignore the CRLF and repeat until 0 */ case READ_CHUNK_SIZE: { String line = readIntoCurrentLine(buffer); - chunkSize = getChunkSize(line); + int chunkSize = getChunkSize(line); + this.chunkSize = chunkSize; if (chunkSize == 0) { checkpoint(State.READ_CHUNK_FOOTER); return null; + } else if (canGenerateChunks()) { + if (chunkSize <= maxChunkSize) { + checkpoint(State.READ_CHUNKED_CONTENT); + } else { + // A chunk is too large. Split them into multiple chunks again. + checkpoint(State.READ_CHUNKED_CONTENT_AS_CHUNKS); + } } else { checkpoint(State.READ_CHUNKED_CONTENT); } } case READ_CHUNKED_CONTENT: { - if (mergeChunks) { + if (canGenerateChunks()) { + HttpChunk chunk = new DefaultHttpChunk(buffer.readBytes(chunkSize)); + checkpoint(State.READ_CHUNK_DELIMITER); + return chunk; + } else { if (content == null) { content = ChannelBuffers.dynamicBuffer( chunkSize, channel.getConfig().getBufferFactory()); } content.writeBytes(buffer, chunkSize); checkpoint(State.READ_CHUNK_DELIMITER); + return null; + } + } + case READ_CHUNKED_CONTENT_AS_CHUNKS: { + int chunkSize = this.chunkSize; + HttpChunk chunk; + if (chunkSize > maxChunkSize) { + chunk = new DefaultHttpChunk(buffer.readBytes(maxChunkSize)); + chunkSize -= maxChunkSize; } else { - HttpChunk chunk = new DefaultHttpChunk(buffer.readBytes(chunkSize)); + chunk = new DefaultHttpChunk(buffer.readBytes(chunkSize)); + chunkSize = 0; + } + this.chunkSize = chunkSize; + + if (chunkSize == 0) { + // Read all content. checkpoint(State.READ_CHUNK_DELIMITER); + } + + if (!chunk.isLast()) { return chunk; } } @@ -172,12 +270,13 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder> private final AtomicReference cumulation = new AtomicReference(); + private final boolean unfold; private volatile ReplayingDecoderBuffer replayable; private volatile T state; private volatile int checkpoint; @@ -231,11 +231,20 @@ public abstract class ReplayingDecoder> this(null); } + protected ReplayingDecoder(boolean unfold) { + this(null, unfold); + } + /** * Creates a new instance with the specified initial state. */ protected ReplayingDecoder(T initialState) { + this(initialState, false); + } + + protected ReplayingDecoder(T initialState, boolean unfold) { this.state = initialState; + this.unfold = unfold; } /** @@ -398,6 +407,29 @@ public abstract class ReplayingDecoder> } // A successful decode + fireMessageReceived(context, remoteAddress, result); + } + } + + private void fireMessageReceived(ChannelHandlerContext context, SocketAddress remoteAddress, Object result) { + if (unfold) { + if (result instanceof Object[]) { + for (Object r: (Object[]) result) { + Channels.fireMessageReceived(context, r, remoteAddress); + } + } else if (result.getClass().isArray()){ + int length = Array.getLength(result); + for (int i = 0; i < length; i ++) { + Channels.fireMessageReceived(context, Array.get(result, i), remoteAddress); + } + } else if (result instanceof Iterable) { + for (Object r: (Iterable) result) { + Channels.fireMessageReceived(context, r, remoteAddress); + } + } else { + Channels.fireMessageReceived(context, result, remoteAddress); + } + } else { Channels.fireMessageReceived(context, result, remoteAddress); } } @@ -413,7 +445,7 @@ public abstract class ReplayingDecoder> // and send the remainders too if necessary. Object partiallyDecoded = decodeLast(ctx, e.getChannel(), cumulation, state); if (partiallyDecoded != null) { - fireMessageReceived(ctx, partiallyDecoded, null); + fireMessageReceived(ctx, null, partiallyDecoded); } } }