HttpObjectAggregator Potential Leak

Motivation:
HttpObjectAggregator has a potential to leak if a new message is received before the existing message has completed, and if a HttpContent is received but maxContentLength has been exceeded, or the content length is too long.

Modifications:
- Make the HttpObjectAggregator more robust to leaks
- Remove the tooLongFrameFound member variable

Result:
More robust HttpObjectAggregator with less chance of leaks
This commit is contained in:
Scott Mitchell 2016-08-04 20:54:27 -07:00
parent cf71e5bae2
commit b712480b56

View File

@ -99,7 +99,6 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
private final int maxContentLength;
private AggregatedFullHttpMessage currentMessage;
private boolean tooLongFrameFound;
private final boolean closeOnExpectationFailed;
private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS;
@ -168,18 +167,17 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
@Override
protected void decode(final ChannelHandlerContext ctx, HttpObject msg, List<Object> out) throws Exception {
AggregatedFullHttpMessage currentMessage = this.currentMessage;
if (msg instanceof HttpMessage) {
tooLongFrameFound = false;
assert currentMessage == null;
if (currentMessage != null) {
currentMessage.release();
currentMessage = null;
throw new IllegalStateException("Start of new message received before existing message completed.");
}
HttpMessage m = (HttpMessage) msg;
// Handle the 'Expect: 100-continue' header if necessary.
if (is100ContinueExpected(m)) {
if (HttpHeaders.getContentLength(m, 0) > maxContentLength) {
tooLongFrameFound = true;
final ChannelFuture future = ctx.writeAndFlush(EXPECTATION_FAILED.duplicate().retain());
future.addListener(new ChannelFutureListener() {
@Override
@ -208,18 +206,16 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
if (!m.getDecoderResult().isSuccess()) {
removeTransferEncodingChunked(m);
out.add(toFullMessage(m));
this.currentMessage = null;
return;
}
if (msg instanceof HttpRequest) {
HttpRequest header = (HttpRequest) msg;
this.currentMessage = currentMessage = new AggregatedFullHttpRequest(
currentMessage = new AggregatedFullHttpRequest(
header, ctx.alloc().compositeBuffer(maxCumulationBufferComponents), null);
} else if (msg instanceof HttpResponse) {
HttpResponse header = (HttpResponse) msg;
this.currentMessage = currentMessage = new AggregatedFullHttpResponse(
header,
Unpooled.compositeBuffer(maxCumulationBufferComponents), null);
currentMessage = new AggregatedFullHttpResponse(
header, ctx.alloc().compositeBuffer(maxCumulationBufferComponents), null);
} else {
throw new Error();
}
@ -227,25 +223,20 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
// A streamed message - initialize the cumulative buffer, and wait for incoming chunks.
removeTransferEncodingChunked(currentMessage);
} else if (msg instanceof HttpContent) {
if (tooLongFrameFound) {
if (msg instanceof LastHttpContent) {
this.currentMessage = null;
}
// already detect the too long frame so just discard the content
if (currentMessage == null) {
// it is possible that a TooLongFrameException was already thrown but we can still discard data
// until the begging of the next request/response.
return;
}
assert currentMessage != null;
// Merge the received chunk into the content of the current message.
HttpContent chunk = (HttpContent) msg;
CompositeByteBuf content = (CompositeByteBuf) currentMessage.content();
if (content.readableBytes() > maxContentLength - chunk.content().readableBytes()) {
tooLongFrameFound = true;
// release current message to prevent leaks
currentMessage.release();
this.currentMessage = null;
currentMessage = null;
throw new TooLongFrameException(
"HTTP content length exceeded " + maxContentLength +
@ -254,9 +245,7 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
// Append the content of the chunk
if (chunk.content().isReadable()) {
chunk.retain();
content.addComponent(chunk.content());
content.writerIndex(content.writerIndex() + chunk.content().readableBytes());
content.addComponent(true, chunk.content().retain());
}
final boolean last;
@ -269,8 +258,6 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
}
if (last) {
this.currentMessage = null;
// Merge trailing headers into the message.
if (chunk instanceof LastHttpContent) {
LastHttpContent trailer = (LastHttpContent) chunk;
@ -290,7 +277,9 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
Names.CONTENT_LENGTH,
String.valueOf(content.readableBytes()));
}
// All done
// Set our currentMessage member variable to null in case adding to out will cause re-entry.
AggregatedFullHttpMessage currentMessage = this.currentMessage;
this.currentMessage = null;
out.add(currentMessage);
}
} else {
@ -300,12 +289,11 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
// release current message if it is not null as it may be a left-over
if (currentMessage != null) {
currentMessage.release();
currentMessage = null;
try {
super.channelInactive(ctx);
} finally {
// release current message if it is not null as it may be a left-over
releaseCurrentMessage();
}
}
@ -316,9 +304,16 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
// release current message if it is not null as it may be a left-over as there is not much more we can do in
// this case
try {
super.handlerRemoved(ctx);
} finally {
// release current message if it is not null as it may be a left-over as there is not much more we can do in
// this case
releaseCurrentMessage();
}
}
private void releaseCurrentMessage() {
if (currentMessage != null) {
currentMessage.release();
currentMessage = null;