Derek Troy-West 854859ba69 Change AggregatedFullHttpMessage to contain a content ByteBuf

Other implementations of FullHttpMessage allow .toString to be called after the Message has been released
This brings AggregatedFullHttpMessage into line with those impls.


- Changed AggregatedFullHttpMessage to no longer be a sub-class of DefaultByteBufHolder
- Changes AggregatedFullHttpMessage to implement ByteBufHolder
- Hold the content buffer internally to AggregatedFullHttpMessage
- Implement the required content() and release() methods that were missing
- Do not check refcnt when accessing content() (similar to DefaultFullHttpMessage)


A released AggregatedFullHttpMessage can have .toString called without throwing an exception
2015-04-16 14:43:50 +02:00

524 lines
17 KiB

package io.netty.handler.codec.http;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.MessageAggregator;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
* A {@link ChannelHandler} that aggregates an {@link HttpMessage}
* and its following {@link HttpContent}s into a single {@link FullHttpRequest}
* or {@link FullHttpResponse} (depending on if it used to handle requests or responses)
* with no following {@link HttpContent}s. It is useful when you don't want to take
* care of HTTP messages whose transfer encoding is 'chunked'. Insert this
* handler after {@link HttpObjectDecoder} in the {@link ChannelPipeline}:
* <pre>
* {@link ChannelPipeline} p = ...;
* ...
* p.addLast("encoder", new {@link HttpResponseEncoder}());
* p.addLast("decoder", new {@link HttpRequestDecoder}());
* p.addLast("aggregator", <b>new {@link HttpObjectAggregator}(1048576)</b>);
* ...
* p.addLast("handler", new HttpRequestHandler());
* </pre>
* Be aware that you need to have the {@link HttpResponseEncoder} or {@link HttpRequestEncoder}
* before the {@link HttpObjectAggregator} in the {@link ChannelPipeline}.
public class HttpObjectAggregator
extends MessageAggregator<HttpObject, HttpMessage, HttpContent, FullHttpMessage> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(HttpObjectAggregator.class);
private static final FullHttpResponse CONTINUE = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE, Unpooled.EMPTY_BUFFER);
private static final FullHttpResponse TOO_LARGE = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, Unpooled.EMPTY_BUFFER);
static {
TOO_LARGE.headers().set(HttpHeaderNames.CONTENT_LENGTH, 0);
* Creates a new instance.
* @param maxContentLength
* the maximum length of the aggregated content in bytes.
* If the length of the aggregated content exceeds this value,
* {@link #handleOversizedMessage(ChannelHandlerContext, HttpMessage)}
* will be called.
public HttpObjectAggregator(int maxContentLength) {
protected boolean isStartMessage(HttpObject msg) throws Exception {
return msg instanceof HttpMessage;
protected boolean isContentMessage(HttpObject msg) throws Exception {
return msg instanceof HttpContent;
protected boolean isLastContentMessage(HttpContent msg) throws Exception {
return msg instanceof LastHttpContent;
protected boolean isAggregated(HttpObject msg) throws Exception {
return msg instanceof FullHttpMessage;
protected boolean hasContentLength(HttpMessage start) throws Exception {
return HttpHeaderUtil.isContentLengthSet(start);
protected long contentLength(HttpMessage start) throws Exception {
return HttpHeaderUtil.getContentLength(start);
protected Object newContinueResponse(HttpMessage start) throws Exception {
if (HttpHeaderUtil.is100ContinueExpected(start)) {
return CONTINUE;
} else {
return null;
protected FullHttpMessage beginAggregation(HttpMessage start, ByteBuf content) throws Exception {
assert !(start instanceof FullHttpMessage);
HttpHeaderUtil.setTransferEncodingChunked(start, false);
AggregatedFullHttpMessage ret;
if (start instanceof HttpRequest) {
ret = new AggregatedFullHttpRequest((HttpRequest) start, content, null);
} else if (start instanceof HttpResponse) {
ret = new AggregatedFullHttpResponse((HttpResponse) start, content, null);
} else {
throw new Error();
return ret;
protected void aggregate(FullHttpMessage aggregated, HttpContent content) throws Exception {
if (content instanceof LastHttpContent) {
// Merge trailing headers into the message.
((AggregatedFullHttpMessage) aggregated).setTrailingHeaders(((LastHttpContent) content).trailingHeaders());
protected void finishAggregation(FullHttpMessage aggregated) throws Exception {
// Set the 'Content-Length' header. If one isn't already set.
// This is important as HEAD responses will use a 'Content-Length' header which
// does not match the actual body, but the number of bytes that would be
// transmitted if a GET would have been used.
// See rfc2616 14.13 Content-Length
if (!HttpHeaderUtil.isContentLengthSet(aggregated)) {
protected void handleOversizedMessage(final ChannelHandlerContext ctx, HttpMessage oversized) throws Exception {
if (oversized instanceof HttpRequest) {
// send back a 413 and close the connection
ChannelFuture future = ctx.writeAndFlush(TOO_LARGE).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
logger.debug("Failed to send a 413 Request Entity Too Large.", future.cause());
// If the client started to send data already, close because it's impossible to recover.
// If keep-alive is off and 'Expect: 100-continue' is missing, no need to leave the connection open.
if (oversized instanceof FullHttpMessage ||
!HttpHeaderUtil.is100ContinueExpected(oversized) && !HttpHeaderUtil.isKeepAlive(oversized)) {
// If an oversized request was handled properly and the connection is still alive
// (i.e. rejected 100-continue). the decoder should prepare to handle a new message.
HttpObjectDecoder decoder = ctx.pipeline().get(HttpObjectDecoder.class);
if (decoder != null) {
} else if (oversized instanceof HttpResponse) {
throw new TooLongFrameException("Response entity too large: " + oversized);
} else {
throw new IllegalStateException();
private abstract static class AggregatedFullHttpMessage implements ByteBufHolder, FullHttpMessage {
protected final HttpMessage message;
private final ByteBuf content;
private HttpHeaders trailingHeaders;
AggregatedFullHttpMessage(HttpMessage message, ByteBuf content, HttpHeaders trailingHeaders) {
this.message = message;
this.content = content;
this.trailingHeaders = trailingHeaders;
public HttpHeaders trailingHeaders() {
HttpHeaders trailingHeaders = this.trailingHeaders;
if (trailingHeaders == null) {
return HttpHeaders.EMPTY_HEADERS;
} else {
return trailingHeaders;
void setTrailingHeaders(HttpHeaders trailingHeaders) {
this.trailingHeaders = trailingHeaders;
public HttpVersion getProtocolVersion() {
return message.protocolVersion();
public HttpVersion protocolVersion() {
return message.protocolVersion();
public FullHttpMessage setProtocolVersion(HttpVersion version) {
return this;
public HttpHeaders headers() {
return message.headers();
public DecoderResult decoderResult() {
return message.decoderResult();
public DecoderResult getDecoderResult() {
return message.decoderResult();
public void setDecoderResult(DecoderResult result) {
public ByteBuf content() {
return content;
public int refCnt() {
return content.refCnt();
public FullHttpMessage retain() {
return this;
public FullHttpMessage retain(int increment) {
return this;
public FullHttpMessage touch(Object hint) {
return this;
public FullHttpMessage touch() {
return this;
public boolean release() {
return content.release();
public boolean release(int decrement) {
return content.release(decrement);
public abstract FullHttpMessage copy();
public abstract FullHttpMessage duplicate();
private static final class AggregatedFullHttpRequest extends AggregatedFullHttpMessage implements FullHttpRequest {
AggregatedFullHttpRequest(HttpRequest request, ByteBuf content, HttpHeaders trailingHeaders) {
super(request, content, trailingHeaders);
* Copy this object
* @param copyContent
* <ul>
* <li>{@code true} if this object's {@link #content()} should be used to copy.</li>
* <li>{@code false} if {@code newContent} should be used instead.</li>
* </ul>
* @param newContent
* <ul>
* <li>if {@code copyContent} is false then this will be used in the copy's content.</li>
* <li>if {@code null} then a default buffer of 0 size will be selected</li>
* </ul>
* @return A copy of this object
private FullHttpRequest copy(boolean copyContent, ByteBuf newContent) {
DefaultFullHttpRequest copy = new DefaultFullHttpRequest(
protocolVersion(), method(), uri(),
copyContent ? content().copy() :
newContent == null ? Unpooled.buffer(0) : newContent);
return copy;
public FullHttpRequest copy(ByteBuf newContent) {
return copy(false, newContent);
public FullHttpRequest copy() {
return copy(true, null);
public FullHttpRequest duplicate() {
DefaultFullHttpRequest duplicate = new DefaultFullHttpRequest(
getProtocolVersion(), getMethod(), getUri(), content().duplicate());
return duplicate;
public FullHttpRequest retain(int increment) {
return this;
public FullHttpRequest retain() {
return this;
public FullHttpRequest touch() {
return this;
public FullHttpRequest touch(Object hint) {
return this;
public FullHttpRequest setMethod(HttpMethod method) {
((HttpRequest) message).setMethod(method);
return this;
public FullHttpRequest setUri(String uri) {
((HttpRequest) message).setUri(uri);
return this;
public HttpMethod getMethod() {
return ((HttpRequest) message).method();
public String getUri() {
return ((HttpRequest) message).uri();
public HttpMethod method() {
return getMethod();
public String uri() {
return getUri();
public FullHttpRequest setProtocolVersion(HttpVersion version) {
return this;
public String toString() {
return HttpMessageUtil.appendFullRequest(new StringBuilder(256), this).toString();
private static final class AggregatedFullHttpResponse extends AggregatedFullHttpMessage
implements FullHttpResponse {
AggregatedFullHttpResponse(HttpResponse message, ByteBuf content, HttpHeaders trailingHeaders) {
super(message, content, trailingHeaders);
* Copy this object
* @param copyContent
* <ul>
* <li>{@code true} if this object's {@link #content()} should be used to copy.</li>
* <li>{@code false} if {@code newContent} should be used instead.</li>
* </ul>
* @param newContent
* <ul>
* <li>if {@code copyContent} is false then this will be used in the copy's content.</li>
* <li>if {@code null} then a default buffer of 0 size will be selected</li>
* </ul>
* @return A copy of this object
private FullHttpResponse copy(boolean copyContent, ByteBuf newContent) {
DefaultFullHttpResponse copy = new DefaultFullHttpResponse(
protocolVersion(), status(),
copyContent ? content().copy() :
newContent == null ? Unpooled.buffer(0) : newContent);
return copy;
public FullHttpResponse copy(ByteBuf newContent) {
return copy(false, newContent);
public FullHttpResponse copy() {
return copy(true, null);
public FullHttpResponse duplicate() {
DefaultFullHttpResponse duplicate = new DefaultFullHttpResponse(getProtocolVersion(), getStatus(),
return duplicate;
public FullHttpResponse setStatus(HttpResponseStatus status) {
((HttpResponse) message).setStatus(status);
return this;
public HttpResponseStatus getStatus() {
return ((HttpResponse) message).status();
public HttpResponseStatus status() {
return getStatus();
public FullHttpResponse setProtocolVersion(HttpVersion version) {
return this;
public FullHttpResponse retain(int increment) {
return this;
public FullHttpResponse retain() {
return this;
public FullHttpResponse touch(Object hint) {
return this;
public FullHttpResponse touch() {
return this;
public String toString() {
return HttpMessageUtil.appendFullResponse(new StringBuilder(256), this).toString();