Reduce memory copies in HttpContentDecoder and so also the risk of memory leaks

This commit is contained in:
Norman Maurer 2013-07-14 23:12:49 +02:00
parent ecb215c12f
commit 0393ffbfb2

View File

@ -16,8 +16,6 @@
package io.netty.handler.codec.http;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.MessageToMessageDecoder;
@ -108,15 +106,23 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<HttpObj
headers.set(HttpHeaders.Names.CONTENT_ENCODING, targetContentEncoding);
}
Object[] decoded = decodeContent(message, c);
out.add(message);
decodeContent(c, out);
// Replace the content.
// Replace the content length.
if (headers.contains(HttpHeaders.Names.CONTENT_LENGTH)) {
int contentLength = 0;
int size = out.size();
for (int i = 0; i < size; i++) {
Object o = out.get(i);
if (o instanceof HttpContent) {
contentLength += ((HttpContent) o).content().readableBytes();
}
}
headers.set(
HttpHeaders.Names.CONTENT_LENGTH,
Integer.toString(((ByteBufHolder) decoded[1]).content().readableBytes()));
Integer.toString(contentLength));
}
out.add(decoded);
return;
}
@ -129,7 +135,7 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<HttpObj
}
if (decoder != null) {
out.add(decodeContent(null, c));
decodeContent(c, out);
} else {
if (c instanceof LastHttpContent) {
decodeStarted = false;
@ -139,36 +145,17 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<HttpObj
}
}
private Object[] decodeContent(HttpMessage header, HttpContent c) {
ByteBuf newContent = Unpooled.buffer();
private void decodeContent(HttpContent c, List<Object> out) {
ByteBuf content = c.content();
decode(content, newContent);
decode(content, out);
if (c instanceof LastHttpContent) {
ByteBuf lastProduct = Unpooled.buffer();
finishDecode(lastProduct);
finishDecode(out);
// Generate an additional chunk if the decoder produced
// the last product on closure,
if (lastProduct.isReadable()) {
if (header == null) {
return new Object[] { new DefaultHttpContent(newContent), new DefaultLastHttpContent(lastProduct)};
} else {
return new Object[] { header, new DefaultHttpContent(newContent),
new DefaultLastHttpContent(lastProduct)};
}
} else {
if (header == null) {
return new Object[] { new DefaultLastHttpContent(newContent) };
} else {
return new Object[] { header, new DefaultLastHttpContent(newContent) };
}
}
}
if (header == null) {
return new Object[] { new DefaultHttpContent(newContent) };
} else {
return new Object[] { header, new DefaultHttpContent(newContent) };
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
}
}
@ -210,20 +197,28 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<HttpObj
private void cleanup() {
if (decoder != null) {
// Clean-up the previous decoder if not cleaned up correctly.
ByteBuf buf = Unpooled.buffer();
finishDecode(buf);
buf.release();
// Clean-up the previous encoder if not cleaned up correctly.
if (decoder.finish()) {
for (;;) {
ByteBuf buf = (ByteBuf) decoder.readOutbound();
if (buf == null) {
break;
}
// Release the buffer
buf.release();
}
}
decoder = null;
}
}
private void decode(ByteBuf in, ByteBuf out) {
// call retain as it will be release after is written
decoder.writeInbound(in.retain());
private void decode(ByteBuf in, List<Object> out) {
// call retain here as it will call release after its written to the channel
decoder.writeOutbound(in.retain());
fetchDecoderOutput(out);
}
private void finishDecode(ByteBuf out) {
private void finishDecode(List<Object> out) {
if (decoder.finish()) {
fetchDecoderOutput(out);
}
@ -231,14 +226,17 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<HttpObj
decoder = null;
}
private void fetchDecoderOutput(ByteBuf out) {
private void fetchDecoderOutput(List<Object> out) {
for (;;) {
ByteBuf buf = (ByteBuf) decoder.readInbound();
ByteBuf buf = (ByteBuf) decoder.readOutbound();
if (buf == null) {
break;
}
out.writeBytes(buf);
buf.release();
if (!buf.isReadable()) {
buf.release();
continue;
}
out.add(new DefaultHttpContent(buf));
}
}
}