Fixed NETTY-130 (Chunked encoding emulation for a large non-chunked HTTP request)

* Replaced mergeChunks option with maxChunkSize
* if maxChunkSize is greater than 0 and any content or chunk larger than maxChunkSize is received, it's split into multiple chunks as if a chunked request is received.
* Added unfold option to FrameDecoder and ReplayingDecoder
This commit is contained in:
Trustin Lee 2009-03-09 08:50:24 +00:00
parent 7985fa94a9
commit 832ed0c198
8 changed files with 195 additions and 28 deletions

View File

@ -44,7 +44,7 @@ public class HttpServerPipelineFactory implements ChannelPipelineFactory {
// Create a default pipeline implementation. // Create a default pipeline implementation.
ChannelPipeline pipeline = pipeline(); ChannelPipeline pipeline = pipeline();
pipeline.addLast("decoder", new HttpRequestDecoder(false)); pipeline.addLast("decoder", new HttpRequestDecoder(8192));
pipeline.addLast("encoder", new HttpResponseEncoder()); pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("handler", handler); pipeline.addLast("handler", handler);
return pipeline; return pipeline;

View File

@ -22,8 +22,7 @@
*/ */
package org.jboss.netty.handler.codec.frame; package org.jboss.netty.handler.codec.frame;
import static org.jboss.netty.channel.Channels.*; import java.lang.reflect.Array;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -35,6 +34,7 @@ import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage; import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChannelUpstreamHandler; import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.LifeCycleAwareChannelHandler; import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
@ -151,9 +151,17 @@ import org.jboss.netty.channel.SimpleChannelHandler;
public abstract class FrameDecoder public abstract class FrameDecoder
extends SimpleChannelHandler implements LifeCycleAwareChannelHandler { extends SimpleChannelHandler implements LifeCycleAwareChannelHandler {
private final boolean unfold;
private final AtomicReference<ChannelBuffer> cumulation = private final AtomicReference<ChannelBuffer> cumulation =
new AtomicReference<ChannelBuffer>(); new AtomicReference<ChannelBuffer>();
protected FrameDecoder() {
this(false);
}
protected FrameDecoder(boolean unfold) {
this.unfold = unfold;
}
@Override @Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
@ -276,7 +284,30 @@ public abstract class FrameDecoder
"if it returned a frame."); "if it returned a frame.");
} }
fireMessageReceived(context, frame, remoteAddress); fireMessageReceived(context, remoteAddress, frame);
}
}
private void fireMessageReceived(ChannelHandlerContext context, SocketAddress remoteAddress, Object result) {
if (unfold) {
if (result instanceof Object[]) {
for (Object r: (Object[]) result) {
Channels.fireMessageReceived(context, r, remoteAddress);
}
} else if (result.getClass().isArray()){
int length = Array.getLength(result);
for (int i = 0; i < length; i ++) {
Channels.fireMessageReceived(context, Array.get(result, i), remoteAddress);
}
} else if (result instanceof Iterable) {
for (Object r: (Iterable<?>) result) {
Channels.fireMessageReceived(context, r, remoteAddress);
}
} else {
Channels.fireMessageReceived(context, result, remoteAddress);
}
} else {
Channels.fireMessageReceived(context, result, remoteAddress);
} }
} }
@ -291,7 +322,7 @@ public abstract class FrameDecoder
// and send the remainders too if necessary. // and send the remainders too if necessary.
Object partialFrame = decodeLast(ctx, ctx.getChannel(), cumulation); Object partialFrame = decodeLast(ctx, ctx.getChannel(), cumulation);
if (partialFrame != null) { if (partialFrame != null) {
fireMessageReceived(ctx, partialFrame, null); fireMessageReceived(ctx, null, partialFrame);
} }
} }
} }

View File

@ -28,6 +28,7 @@ import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.util.CaseIgnoringComparator; import org.jboss.netty.util.CaseIgnoringComparator;
/** /**
@ -42,7 +43,7 @@ public class DefaultHttpMessage implements HttpMessage {
private final HttpVersion version; private final HttpVersion version;
private final Map<String, List<String>> headers = new TreeMap<String, List<String>>(CaseIgnoringComparator.INSTANCE); private final Map<String, List<String>> headers = new TreeMap<String, List<String>>(CaseIgnoringComparator.INSTANCE);
private ChannelBuffer content; private ChannelBuffer content = ChannelBuffers.EMPTY_BUFFER;
protected DefaultHttpMessage(final HttpVersion version) { protected DefaultHttpMessage(final HttpVersion version) {
this.version = version; this.version = version;

View File

@ -23,6 +23,7 @@
package org.jboss.netty.handler.codec.http; package org.jboss.netty.handler.codec.http;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
/** /**
* @author The Netty Project (netty-dev@lists.jboss.org) * @author The Netty Project (netty-dev@lists.jboss.org)
@ -30,6 +31,9 @@ import org.jboss.netty.buffer.ChannelBuffer;
* @version $Rev$, $Date$ * @version $Rev$, $Date$
*/ */
public interface HttpChunk { public interface HttpChunk {
static HttpChunk LAST_CHUNK = new DefaultHttpChunk(ChannelBuffers.EMPTY_BUFFER);
boolean isLast(); boolean isLast();
ChannelBuffer getContent(); ChannelBuffer getContent();
} }

View File

@ -21,8 +21,6 @@
*/ */
package org.jboss.netty.handler.codec.http; package org.jboss.netty.handler.codec.http;
import static org.jboss.netty.buffer.ChannelBuffers.*;
import java.util.List; import java.util.List;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -48,7 +46,7 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<HttpMessageDec
private static final Pattern HEADER_PATTERN = Pattern.compile( private static final Pattern HEADER_PATTERN = Pattern.compile(
"^\\s*(\\S+)\\s*:\\s*(.*)\\s*$"); "^\\s*(\\S+)\\s*:\\s*(.*)\\s*$");
private final boolean mergeChunks; private final int maxChunkSize;
protected volatile HttpMessage message; protected volatile HttpMessage message;
private volatile ChannelBuffer content; private volatile ChannelBuffer content;
private volatile int chunkSize; private volatile int chunkSize;
@ -65,20 +63,32 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<HttpMessageDec
READ_INITIAL, READ_INITIAL,
READ_HEADER, READ_HEADER,
READ_VARIABLE_LENGTH_CONTENT, READ_VARIABLE_LENGTH_CONTENT,
READ_VARIABLE_LENGTH_CONTENT_AS_CHUNKS,
READ_FIXED_LENGTH_CONTENT, READ_FIXED_LENGTH_CONTENT,
READ_FIXED_LENGTH_CONTENT_AS_CHUNKS,
READ_CHUNK_SIZE, READ_CHUNK_SIZE,
READ_CHUNKED_CONTENT, READ_CHUNKED_CONTENT,
READ_CHUNKED_CONTENT_AS_CHUNKS,
READ_CHUNK_DELIMITER, READ_CHUNK_DELIMITER,
READ_CHUNK_FOOTER; READ_CHUNK_FOOTER;
} }
protected HttpMessageDecoder() { protected HttpMessageDecoder() {
this(true); this(0);
} }
protected HttpMessageDecoder(boolean mergeChunks) { protected HttpMessageDecoder(int maxChunkSize) {
super(State.SKIP_CONTROL_CHARS); super(State.SKIP_CONTROL_CHARS, true);
this.mergeChunks = mergeChunks; if (maxChunkSize < 0) {
throw new IllegalArgumentException(
"maxChunkSize must not be a negative integer: " +
maxChunkSize);
}
this.maxChunkSize = maxChunkSize;
}
private boolean canGenerateChunks() {
return maxChunkSize > 0;
} }
@Override @Override
@ -100,18 +110,39 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<HttpMessageDec
checkpoint(nextState); checkpoint(nextState);
if (nextState == State.READ_CHUNK_SIZE) { if (nextState == State.READ_CHUNK_SIZE) {
// Chunked encoding // Chunked encoding
if (!mergeChunks) { if (canGenerateChunks()) {
// Generate HttpMessage first. HttpChunks will follow.
return message; return message;
} else {
// Merge all chunks.
} }
} else { } else {
// Not a chunked encoding
int contentLength = message.getContentLength(-1); int contentLength = message.getContentLength(-1);
if (contentLength == 0 || contentLength == -1 && isDecodingRequest()) { if (contentLength == 0 || contentLength == -1 && isDecodingRequest()) {
content = ChannelBuffers.EMPTY_BUFFER; content = ChannelBuffers.EMPTY_BUFFER;
return reset(); return reset();
} }
if (canGenerateChunks()) {
// Emulate chunked encoding if the content is too large or
// the content length is indefinite.
if (contentLength > maxChunkSize && nextState == State.READ_FIXED_LENGTH_CONTENT) {
// Generate HttpMessage first. HttpChunks will follow.
checkpoint(State.READ_FIXED_LENGTH_CONTENT_AS_CHUNKS);
message.addHeader(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED);
// chunkSize will be decreased as the READ_FIXED_LENGTH_CONTENT_AS_CHUNKS
// state reads data chunk by chunk.
chunkSize = message.getContentLength(-1);
return message;
} else if (nextState == State.READ_VARIABLE_LENGTH_CONTENT) {
// Generate HttpMessage first. HttpChunks will follow.
checkpoint(State.READ_VARIABLE_LENGTH_CONTENT_AS_CHUNKS);
message.addHeader(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED);
return message;
}
}
} }
//we return null here, this forces decode to be called again where we will decode the content // We return null here, this forces decode to be called again where we will decode the content
return null; return null;
} }
case READ_VARIABLE_LENGTH_CONTENT: { case READ_VARIABLE_LENGTH_CONTENT: {
@ -122,36 +153,103 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<HttpMessageDec
content.writeBytes(buffer.readBytes(buffer.readableBytes())); content.writeBytes(buffer.readBytes(buffer.readableBytes()));
return reset(); return reset();
} }
case READ_VARIABLE_LENGTH_CONTENT_AS_CHUNKS: {
// Keep reading data as a chunk until the end of connection is reached.
int chunkSize = Math.min(maxChunkSize, buffer.readableBytes());
HttpChunk chunk = new DefaultHttpChunk(buffer.readBytes(chunkSize));
if (!buffer.readable()) {
// Reached to the end of the connection.
reset();
if (!chunk.isLast()) {
// Append the last chunk.
return new HttpChunk[] { chunk, HttpChunk.LAST_CHUNK };
}
}
return chunk;
}
case READ_FIXED_LENGTH_CONTENT: { case READ_FIXED_LENGTH_CONTENT: {
//we have a content-length so we just read the correct number of bytes //we have a content-length so we just read the correct number of bytes
readFixedLengthContent(buffer); readFixedLengthContent(buffer);
return reset(); return reset();
} }
case READ_FIXED_LENGTH_CONTENT_AS_CHUNKS: {
int chunkSize = this.chunkSize;
HttpChunk chunk;
if (chunkSize > maxChunkSize) {
chunk = new DefaultHttpChunk(buffer.readBytes(maxChunkSize));
chunkSize -= maxChunkSize;
} else {
chunk = new DefaultHttpChunk(buffer.readBytes(chunkSize));
chunkSize = 0;
}
this.chunkSize = chunkSize;
if (chunkSize == 0) {
// Read all content.
reset();
if (!chunk.isLast()) {
// Append the last chunk.
return new HttpChunk[] { chunk, HttpChunk.LAST_CHUNK };
}
}
return chunk;
}
/** /**
* everything else after this point takes care of reading chunked content. basically, read chunk size, * everything else after this point takes care of reading chunked content. basically, read chunk size,
* read chunk, read and ignore the CRLF and repeat until 0 * read chunk, read and ignore the CRLF and repeat until 0
*/ */
case READ_CHUNK_SIZE: { case READ_CHUNK_SIZE: {
String line = readIntoCurrentLine(buffer); String line = readIntoCurrentLine(buffer);
chunkSize = getChunkSize(line); int chunkSize = getChunkSize(line);
this.chunkSize = chunkSize;
if (chunkSize == 0) { if (chunkSize == 0) {
checkpoint(State.READ_CHUNK_FOOTER); checkpoint(State.READ_CHUNK_FOOTER);
return null; return null;
} else if (canGenerateChunks()) {
if (chunkSize <= maxChunkSize) {
checkpoint(State.READ_CHUNKED_CONTENT);
} else {
// A chunk is too large. Split them into multiple chunks again.
checkpoint(State.READ_CHUNKED_CONTENT_AS_CHUNKS);
}
} else { } else {
checkpoint(State.READ_CHUNKED_CONTENT); checkpoint(State.READ_CHUNKED_CONTENT);
} }
} }
case READ_CHUNKED_CONTENT: { case READ_CHUNKED_CONTENT: {
if (mergeChunks) { if (canGenerateChunks()) {
HttpChunk chunk = new DefaultHttpChunk(buffer.readBytes(chunkSize));
checkpoint(State.READ_CHUNK_DELIMITER);
return chunk;
} else {
if (content == null) { if (content == null) {
content = ChannelBuffers.dynamicBuffer( content = ChannelBuffers.dynamicBuffer(
chunkSize, channel.getConfig().getBufferFactory()); chunkSize, channel.getConfig().getBufferFactory());
} }
content.writeBytes(buffer, chunkSize); content.writeBytes(buffer, chunkSize);
checkpoint(State.READ_CHUNK_DELIMITER); checkpoint(State.READ_CHUNK_DELIMITER);
return null;
}
}
case READ_CHUNKED_CONTENT_AS_CHUNKS: {
int chunkSize = this.chunkSize;
HttpChunk chunk;
if (chunkSize > maxChunkSize) {
chunk = new DefaultHttpChunk(buffer.readBytes(maxChunkSize));
chunkSize -= maxChunkSize;
} else { } else {
HttpChunk chunk = new DefaultHttpChunk(buffer.readBytes(chunkSize)); chunk = new DefaultHttpChunk(buffer.readBytes(chunkSize));
chunkSize = 0;
}
this.chunkSize = chunkSize;
if (chunkSize == 0) {
// Read all content.
checkpoint(State.READ_CHUNK_DELIMITER); checkpoint(State.READ_CHUNK_DELIMITER);
}
if (!chunk.isLast()) {
return chunk; return chunk;
} }
} }
@ -172,12 +270,13 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<HttpMessageDec
case READ_CHUNK_FOOTER: { case READ_CHUNK_FOOTER: {
String line = readIntoCurrentLine(buffer); String line = readIntoCurrentLine(buffer);
if (line.trim().length() == 0) { if (line.trim().length() == 0) {
if (mergeChunks) { if (maxChunkSize == 0) {
// Chunked encoding disabled.
return reset(); return reset();
} else { } else {
reset(); reset();
// The last chunk, which is empty // The last chunk, which is empty
return new DefaultHttpChunk(EMPTY_BUFFER); return HttpChunk.LAST_CHUNK;
} }
} else { } else {
checkpoint(State.READ_CHUNK_FOOTER); checkpoint(State.READ_CHUNK_FOOTER);

View File

@ -37,8 +37,8 @@ public class HttpRequestDecoder extends HttpMessageDecoder {
super(); super();
} }
public HttpRequestDecoder(boolean mergeChunks) { public HttpRequestDecoder(int maxChunkSize) {
super(mergeChunks); super(maxChunkSize);
} }
@Override @Override

View File

@ -37,8 +37,8 @@ public class HttpResponseDecoder extends HttpMessageDecoder {
super(); super();
} }
public HttpResponseDecoder(boolean mergeChunks) { public HttpResponseDecoder(int maxChunkSize) {
super(mergeChunks); super(maxChunkSize);
} }
@Override @Override

View File

@ -22,8 +22,7 @@
*/ */
package org.jboss.netty.handler.codec.replay; package org.jboss.netty.handler.codec.replay;
import static org.jboss.netty.channel.Channels.*; import java.lang.reflect.Array;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -220,6 +219,7 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
private final AtomicReference<ChannelBuffer> cumulation = private final AtomicReference<ChannelBuffer> cumulation =
new AtomicReference<ChannelBuffer>(); new AtomicReference<ChannelBuffer>();
private final boolean unfold;
private volatile ReplayingDecoderBuffer replayable; private volatile ReplayingDecoderBuffer replayable;
private volatile T state; private volatile T state;
private volatile int checkpoint; private volatile int checkpoint;
@ -231,11 +231,20 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
this(null); this(null);
} }
protected ReplayingDecoder(boolean unfold) {
this(null, unfold);
}
/** /**
* Creates a new instance with the specified initial state. * Creates a new instance with the specified initial state.
*/ */
protected ReplayingDecoder(T initialState) { protected ReplayingDecoder(T initialState) {
this(initialState, false);
}
protected ReplayingDecoder(T initialState, boolean unfold) {
this.state = initialState; this.state = initialState;
this.unfold = unfold;
} }
/** /**
@ -398,6 +407,29 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
} }
// A successful decode // A successful decode
fireMessageReceived(context, remoteAddress, result);
}
}
private void fireMessageReceived(ChannelHandlerContext context, SocketAddress remoteAddress, Object result) {
if (unfold) {
if (result instanceof Object[]) {
for (Object r: (Object[]) result) {
Channels.fireMessageReceived(context, r, remoteAddress);
}
} else if (result.getClass().isArray()){
int length = Array.getLength(result);
for (int i = 0; i < length; i ++) {
Channels.fireMessageReceived(context, Array.get(result, i), remoteAddress);
}
} else if (result instanceof Iterable) {
for (Object r: (Iterable<?>) result) {
Channels.fireMessageReceived(context, r, remoteAddress);
}
} else {
Channels.fireMessageReceived(context, result, remoteAddress);
}
} else {
Channels.fireMessageReceived(context, result, remoteAddress); Channels.fireMessageReceived(context, result, remoteAddress);
} }
} }
@ -413,7 +445,7 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
// and send the remainders too if necessary. // and send the remainders too if necessary.
Object partiallyDecoded = decodeLast(ctx, e.getChannel(), cumulation, state); Object partiallyDecoded = decodeLast(ctx, e.getChannel(), cumulation, state);
if (partiallyDecoded != null) { if (partiallyDecoded != null) {
fireMessageReceived(ctx, partiallyDecoded, null); fireMessageReceived(ctx, null, partiallyDecoded);
} }
} }
} }