Reuse previous created HttpHeaders by HttpObjectAggregator

Motivation:

HttpObjectAggregator currently creates a new FullHttpResponse / FullHttpRequest for each message it needs to aggregate. While doing so it also creates 2 DefaultHttpHeader instances (one for the headers and one for the trailing headers). This is bad for two reasons:
  - More objects are created then needed and also populate the headers is not for free
  - Headers may get validated even if the validation was disabled in the decoder

Modification:

- Wrap the previous created HttpResponse / HttpRequest and so reuse the original HttpHeaders
- Reuse the previous created trailing HttpHeader.
- Fix a bug where the trailing HttpHeader was incorrectly mixed in the headers.

Result:

- Less GC
- Faster HttpObjectAggregator implementation
This commit is contained in:
Norman Maurer 2014-07-04 11:48:23 +02:00
parent 34febdc987
commit dfa5a88978
2 changed files with 225 additions and 13 deletions

View File

@ -16,12 +16,14 @@
package io.netty.handler.codec.http; package io.netty.handler.codec.http;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.DefaultByteBufHolder;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.MessageAggregator; import io.netty.handler.codec.MessageAggregator;
import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.codec.TooLongFrameException;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
@ -50,7 +52,6 @@ public class HttpObjectAggregator
extends MessageAggregator<HttpObject, HttpMessage, HttpContent, FullHttpMessage> { extends MessageAggregator<HttpObject, HttpMessage, HttpContent, FullHttpMessage> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(HttpObjectAggregator.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(HttpObjectAggregator.class);
private static final FullHttpResponse CONTINUE = new DefaultFullHttpResponse( private static final FullHttpResponse CONTINUE = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE, Unpooled.EMPTY_BUFFER); HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE, Unpooled.EMPTY_BUFFER);
private static final FullHttpResponse TOO_LARGE = new DefaultFullHttpResponse( private static final FullHttpResponse TOO_LARGE = new DefaultFullHttpResponse(
@ -118,21 +119,14 @@ public class HttpObjectAggregator
HttpHeaderUtil.setTransferEncodingChunked(start, false); HttpHeaderUtil.setTransferEncodingChunked(start, false);
FullHttpMessage ret; AggregatedFullHttpMessage ret;
if (start instanceof HttpRequest) { if (start instanceof HttpRequest) {
HttpRequest req = (HttpRequest) start; ret = new AggregatedFullHttpRequest((HttpRequest) start, content, null);
ret = new DefaultFullHttpRequest(req.protocolVersion(),
req.method(), req.uri(), content);
} else if (start instanceof HttpResponse) { } else if (start instanceof HttpResponse) {
HttpResponse res = (HttpResponse) start; ret = new AggregatedFullHttpResponse((HttpResponse) start, content, null);
ret = new DefaultFullHttpResponse(
res.protocolVersion(), res.status(), content);
} else { } else {
throw new Error(); throw new Error();
} }
ret.headers().set(start.headers());
ret.setDecoderResult(start.decoderResult());
return ret; return ret;
} }
@ -140,7 +134,7 @@ public class HttpObjectAggregator
protected void aggregate(FullHttpMessage aggregated, HttpContent content) throws Exception { protected void aggregate(FullHttpMessage aggregated, HttpContent content) throws Exception {
if (content instanceof LastHttpContent) { if (content instanceof LastHttpContent) {
// Merge trailing headers into the message. // Merge trailing headers into the message.
aggregated.headers().add(((LastHttpContent) content).trailingHeaders()); ((AggregatedFullHttpMessage) aggregated).setTrailingHeaders(((LastHttpContent) content).trailingHeaders());
} }
} }
@ -187,4 +181,222 @@ public class HttpObjectAggregator
throw new IllegalStateException(); throw new IllegalStateException();
} }
} }
private abstract static class AggregatedFullHttpMessage extends DefaultByteBufHolder implements FullHttpMessage {
protected final HttpMessage message;
private HttpHeaders trailingHeaders;
private AggregatedFullHttpMessage(HttpMessage message, ByteBuf content, HttpHeaders trailingHeaders) {
super(content);
this.message = message;
this.trailingHeaders = trailingHeaders;
}
@Override
public HttpHeaders trailingHeaders() {
return trailingHeaders;
}
public void setTrailingHeaders(HttpHeaders trailingHeaders) {
this.trailingHeaders = trailingHeaders;
}
@Override
public HttpVersion protocolVersion() {
return message.protocolVersion();
}
@Override
public FullHttpMessage setProtocolVersion(HttpVersion version) {
message.setProtocolVersion(version);
return this;
}
@Override
public HttpHeaders headers() {
return message.headers();
}
@Override
public DecoderResult decoderResult() {
return message.decoderResult();
}
@Override
public void setDecoderResult(DecoderResult result) {
message.setDecoderResult(result);
}
@Override
public FullHttpMessage retain(int increment) {
super.retain(increment);
return this;
}
@Override
public FullHttpMessage retain() {
super.retain();
return this;
}
@Override
public FullHttpMessage touch(Object hint) {
super.touch(hint);
return this;
}
@Override
public FullHttpMessage touch() {
super.touch();
return this;
}
@Override
public abstract FullHttpMessage copy();
@Override
public abstract FullHttpMessage duplicate();
}
private static final class AggregatedFullHttpRequest extends AggregatedFullHttpMessage implements FullHttpRequest {
private AggregatedFullHttpRequest(HttpRequest request, ByteBuf content, HttpHeaders trailingHeaders) {
super(request, content, trailingHeaders);
}
@Override
public FullHttpRequest copy() {
DefaultFullHttpRequest copy = new DefaultFullHttpRequest(
protocolVersion(), method(), uri(), content().copy());
copy.headers().set(headers());
copy.trailingHeaders().set(trailingHeaders());
return copy;
}
@Override
public FullHttpRequest duplicate() {
DefaultFullHttpRequest duplicate = new DefaultFullHttpRequest(
protocolVersion(), method(), uri(), content().duplicate());
duplicate.headers().set(headers());
duplicate.trailingHeaders().set(trailingHeaders());
return duplicate;
}
@Override
public FullHttpRequest retain(int increment) {
super.retain(increment);
return this;
}
@Override
public FullHttpRequest retain() {
super.retain();
return this;
}
@Override
public FullHttpRequest touch() {
super.touch();
return this;
}
@Override
public FullHttpRequest touch(Object hint) {
super.touch(hint);
return this;
}
@Override
public FullHttpRequest setMethod(HttpMethod method) {
((HttpRequest) message).setMethod(method);
return this;
}
@Override
public FullHttpRequest setUri(String uri) {
((HttpRequest) message).setUri(uri);
return this;
}
@Override
public HttpMethod method() {
return ((HttpRequest) message).method();
}
@Override
public String uri() {
return ((HttpRequest) message).uri();
}
@Override
public FullHttpRequest setProtocolVersion(HttpVersion version) {
super.setProtocolVersion(version);
return this;
}
}
private static final class AggregatedFullHttpResponse extends AggregatedFullHttpMessage
implements FullHttpResponse {
private AggregatedFullHttpResponse(HttpResponse message, ByteBuf content, HttpHeaders trailingHeaders) {
super(message, content, trailingHeaders);
}
@Override
public FullHttpResponse copy() {
DefaultFullHttpResponse copy = new DefaultFullHttpResponse(
protocolVersion(), status(), content().copy());
copy.headers().set(headers());
copy.trailingHeaders().set(trailingHeaders());
return copy;
}
@Override
public FullHttpResponse duplicate() {
DefaultFullHttpResponse duplicate = new DefaultFullHttpResponse(protocolVersion(), status(),
content().duplicate());
duplicate.headers().set(headers());
duplicate.trailingHeaders().set(trailingHeaders());
return duplicate;
}
@Override
public FullHttpResponse setStatus(HttpResponseStatus status) {
((HttpResponse) message).setStatus(status);
return this;
}
@Override
public HttpResponseStatus status() {
return ((HttpResponse) message).status();
}
@Override
public FullHttpResponse setProtocolVersion(HttpVersion version) {
super.setProtocolVersion(version);
return this;
}
@Override
public FullHttpResponse retain(int increment) {
super.retain(increment);
return this;
}
@Override
public FullHttpResponse retain() {
super.retain();
return this;
}
@Override
public FullHttpResponse touch(Object hint) {
super.touch(hint);
return this;
}
@Override
public FullHttpResponse touch() {
super.touch();
return this;
}
}
} }

View File

@ -100,7 +100,7 @@ public class HttpObjectAggregatorTest {
assertEquals(chunk1.content().readableBytes() + chunk2.content().readableBytes(), assertEquals(chunk1.content().readableBytes() + chunk2.content().readableBytes(),
HttpHeaderUtil.getContentLength(aggratedMessage)); HttpHeaderUtil.getContentLength(aggratedMessage));
assertEquals(aggratedMessage.headers().get("X-Test"), Boolean.TRUE.toString()); assertEquals(aggratedMessage.headers().get("X-Test"), Boolean.TRUE.toString());
assertEquals(aggratedMessage.headers().get("X-Trailer"), Boolean.TRUE.toString()); assertEquals(aggratedMessage.trailingHeaders().get("X-Trailer"), Boolean.TRUE.toString());
checkContentBuffer(aggratedMessage); checkContentBuffer(aggratedMessage);
assertNull(embedder.readInbound()); assertNull(embedder.readInbound());
} }