[#4275] Discard bytes after X reads to guard against OOME.

Motivation:

If a remote peer writes fast enough it may take a long time to have fireChannelReadComplete(...) triggered. Because of this we need to take special care and ensure we try to discard some bytes if channelRead(...) is called to often in ByteToMessageDecoder.

Modifications:

- Add ByteToMessageDecoder.setDiscardAfterReads(...) which allows to set the number of reads after which we try to discard the read bytes
- Use default value of 16 for max reads.

Result:

No risk of OOME.
This commit is contained in:
Norman Maurer 2015-09-25 11:35:46 +02:00
parent a81d63eb55
commit f96777312d

View File

@ -135,6 +135,8 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
private boolean singleDecode; private boolean singleDecode;
private boolean decodeWasNull; private boolean decodeWasNull;
private boolean first; private boolean first;
private int discardAfterReads = 16;
private int numReads;
protected ByteToMessageDecoder() { protected ByteToMessageDecoder() {
CodecUtil.ensureNotSharable(this); CodecUtil.ensureNotSharable(this);
@ -170,6 +172,17 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
this.cumulator = cumulator; this.cumulator = cumulator;
} }
/**
* Set the number of reads after which {@link ByteBuf#discardSomeReadBytes()} are called and so free up memory.
* The default is {@code 16}.
*/
public void setDiscardAfterReads(int discardAfterReads) {
if (discardAfterReads <= 0) {
throw new IllegalArgumentException("discardAfterReads must be > 0");
}
this.discardAfterReads = discardAfterReads;
}
/** /**
* Returns the actual number of readable bytes in the internal cumulative * Returns the actual number of readable bytes in the internal cumulative
* buffer of this decoder. You usually do not need to rely on this value * buffer of this decoder. You usually do not need to rely on this value
@ -205,6 +218,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
buf.release(); buf.release();
} }
cumulation = null; cumulation = null;
numReads = 0;
ctx.fireChannelReadComplete(); ctx.fireChannelReadComplete();
handlerRemoved0(ctx); handlerRemoved0(ctx);
} }
@ -234,9 +248,16 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
throw new DecoderException(t); throw new DecoderException(t);
} finally { } finally {
if (cumulation != null && !cumulation.isReadable()) { if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release(); cumulation.release();
cumulation = null; cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
} }
int size = out.size(); int size = out.size();
decodeWasNull = size == 0; decodeWasNull = size == 0;
@ -252,6 +273,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
@Override @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
numReads = 0;
discardSomeReadBytes(); discardSomeReadBytes();
if (decodeWasNull) { if (decodeWasNull) {
decodeWasNull = false; decodeWasNull = false;