MessageAggregator Potential Leak

Motivation:
MessageAggregator has a potential to leak if a new message is received before the existing message has completed, and if a HttpContent is received but maxContentLength has been exceeded, or the content length is too long.

Modifications:

- Make the HttpObjectAggregator more robust to leaks
- Reduce dependance on handlingOversizedMessage but instead rely on the more general check of a null currentMessage

Result:
More robust MessageAggregator with less chance of leaks
This commit is contained in:
Scott Mitchell 2016-08-04 22:14:48 -07:00
parent eb7f751ba5
commit e3462a79c7

View File

@ -18,7 +18,6 @@ package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
@ -28,6 +27,8 @@ import io.netty.util.ReferenceCountUtil;
import java.util.List;
import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
/**
* An abstract {@link ChannelHandler} that aggregates a series of message objects into a single aggregated message.
* <p>
@ -174,6 +175,10 @@ public abstract class MessageAggregator<I, S, C extends ByteBufHolder, O extends
}
}
/**
* @deprecated This method will be removed in future releases.
*/
@Deprecated
public final boolean isHandlingOversizedMessage() {
return handlingOversizedMessage;
}
@ -187,11 +192,11 @@ public abstract class MessageAggregator<I, S, C extends ByteBufHolder, O extends
@Override
protected void decode(final ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception {
O currentMessage = this.currentMessage;
if (isStartMessage(msg)) {
handlingOversizedMessage = false;
if (currentMessage != null) {
currentMessage.release();
currentMessage = null;
throw new MessageAggregationException();
}
@ -239,11 +244,10 @@ public abstract class MessageAggregator<I, S, C extends ByteBufHolder, O extends
if (m instanceof ByteBufHolder && ((ByteBufHolder) m).content().isReadable()) {
aggregated = beginAggregation(m, ((ByteBufHolder) m).content().retain());
} else {
aggregated = beginAggregation(m, Unpooled.EMPTY_BUFFER);
aggregated = beginAggregation(m, EMPTY_BUFFER);
}
finishAggregation(aggregated);
out.add(aggregated);
this.currentMessage = null;
return;
}
@ -252,30 +256,21 @@ public abstract class MessageAggregator<I, S, C extends ByteBufHolder, O extends
if (m instanceof ByteBufHolder) {
appendPartialContent(content, ((ByteBufHolder) m).content());
}
this.currentMessage = beginAggregation(m, content);
currentMessage = beginAggregation(m, content);
} else if (isContentMessage(msg)) {
@SuppressWarnings("unchecked")
final C m = (C) msg;
final ByteBuf partialContent = ((ByteBufHolder) msg).content();
final boolean isLastContentMessage = isLastContentMessage(m);
if (handlingOversizedMessage) {
if (isLastContentMessage) {
this.currentMessage = null;
}
// already detect the too long frame so just discard the content
return;
}
if (currentMessage == null) {
throw new MessageAggregationException();
// it is possible that a TooLongFrameException was already thrown but we can still discard data
// until the begging of the next request/response.
return;
}
// Merge the received chunk into the content of the current message.
CompositeByteBuf content = (CompositeByteBuf) currentMessage.content();
@SuppressWarnings("unchecked")
final C m = (C) msg;
// Handle oversized message.
if (content.readableBytes() > maxContentLength - partialContent.readableBytes()) {
if (content.readableBytes() > maxContentLength - m.content().readableBytes()) {
// By convention, full message type extends first message type.
@SuppressWarnings("unchecked")
S s = (S) currentMessage;
@ -284,7 +279,7 @@ public abstract class MessageAggregator<I, S, C extends ByteBufHolder, O extends
}
// Append the content of the chunk.
appendPartialContent(content, partialContent);
appendPartialContent(content, m.content());
// Give the subtypes a chance to merge additional information such as trailing headers.
aggregate(currentMessage, m);
@ -299,10 +294,10 @@ public abstract class MessageAggregator<I, S, C extends ByteBufHolder, O extends
}
last = true;
} else {
last = isLastContentMessage;
last = isLastContentMessage(m);
}
} else {
last = isLastContentMessage;
last = isLastContentMessage(m);
}
if (last) {
@ -310,7 +305,7 @@ public abstract class MessageAggregator<I, S, C extends ByteBufHolder, O extends
// All done
out.add(currentMessage);
this.currentMessage = null;
currentMessage = null;
}
} else {
throw new MessageAggregationException();
@ -319,8 +314,7 @@ public abstract class MessageAggregator<I, S, C extends ByteBufHolder, O extends
private static void appendPartialContent(CompositeByteBuf content, ByteBuf partialContent) {
if (partialContent.isReadable()) {
partialContent.retain();
content.addComponent(true, partialContent);
content.addComponent(true, partialContent.retain());
}
}
@ -344,10 +338,11 @@ public abstract class MessageAggregator<I, S, C extends ByteBufHolder, O extends
throws Exception;
/**
* Determine if the channel should be closed after the result of {@link #newContinueResponse(Object)} is written.
* @param msg The return value from {@link #newContinueResponse(Object)}.
* @return {@code true} if the channel should be closed after the result of {@link #newContinueResponse(Object)}
* is written. {@code false} otherwise.
* Determine if the channel should be closed after the result of
* {@link #newContinueResponse(Object, int, ChannelPipeline)} is written.
* @param msg The return value from {@link #newContinueResponse(Object, int, ChannelPipeline)}.
* @return {@code true} if the channel should be closed after the result of
* {@link #newContinueResponse(Object, int, ChannelPipeline)} is written. {@code false} otherwise.
*/
protected abstract boolean closeAfterContinueResponse(Object msg) throws Exception;
@ -355,7 +350,7 @@ public abstract class MessageAggregator<I, S, C extends ByteBufHolder, O extends
* Determine if all objects for the current request/response should be ignored or not.
* Messages will stop being ignored the next time {@link #isContentMessage(Object)} returns {@code true}.
*
* @param msg The return value from {@link #newContinueResponse(Object)}.
* @param msg The return value from {@link #newContinueResponse(Object, int, ChannelPipeline)}.
* @return {@code true} if all objects for the current request/response should be ignored or not.
* {@code false} otherwise.
*/
@ -406,13 +401,12 @@ public abstract class MessageAggregator<I, S, C extends ByteBufHolder, O extends
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// release current message if it is not null as it may be a left-over
if (currentMessage != null) {
currentMessage.release();
currentMessage = null;
try {
// release current message if it is not null as it may be a left-over
super.channelInactive(ctx);
} finally {
releaseCurrentMessage();
}
super.channelInactive(ctx);
}
@Override
@ -422,13 +416,20 @@ public abstract class MessageAggregator<I, S, C extends ByteBufHolder, O extends
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
try {
super.handlerRemoved(ctx);
} finally {
// release current message if it is not null as it may be a left-over as there is not much more we can do in
// this case
releaseCurrentMessage();
}
}
// release current message if it is not null as it may be a left-over as there is not much more we can do in
// this case
private void releaseCurrentMessage() {
if (currentMessage != null) {
currentMessage.release();
currentMessage = null;
handlingOversizedMessage = false;
}
}
}