[#4275] Discard bytes after X reads to guard against OOME.
Motivation: If a remote peer writes fast enough it may take a long time to have fireChannelReadComplete(...) triggered. Because of this we need to take special care and ensure we try to discard some bytes if channelRead(...) is called to often in ByteToMessageDecoder. Modifications: - Add ByteToMessageDecoder.setDiscardAfterReads(...) which allows to set the number of reads after which we try to discard the read bytes - Use default value of 16 for max reads. Result: No risk of OOME.
This commit is contained in:
parent
a09b8c18b9
commit
3540ae64ba
@ -136,6 +136,8 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
|
||||
private boolean singleDecode;
|
||||
private boolean decodeWasNull;
|
||||
private boolean first;
|
||||
private int discardAfterReads = 16;
|
||||
private int numReads;
|
||||
|
||||
protected ByteToMessageDecoder() {
|
||||
CodecUtil.ensureNotSharable(this);
|
||||
@ -171,6 +173,17 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
|
||||
this.cumulator = cumulator;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the number of reads after which {@link ByteBuf#discardSomeReadBytes()} are called and so free up memory.
|
||||
* The default is {@code 16}.
|
||||
*/
|
||||
public void setDiscardAfterReads(int discardAfterReads) {
|
||||
if (discardAfterReads <= 0) {
|
||||
throw new IllegalArgumentException("discardAfterReads must be > 0");
|
||||
}
|
||||
this.discardAfterReads = discardAfterReads;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the actual number of readable bytes in the internal cumulative
|
||||
* buffer of this decoder. You usually do not need to rely on this value
|
||||
@ -206,6 +219,7 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
|
||||
buf.release();
|
||||
}
|
||||
cumulation = null;
|
||||
numReads = 0;
|
||||
ctx.fireChannelReadComplete();
|
||||
handlerRemoved0(ctx);
|
||||
}
|
||||
@ -235,9 +249,16 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
|
||||
throw new DecoderException(t);
|
||||
} finally {
|
||||
if (cumulation != null && !cumulation.isReadable()) {
|
||||
numReads = 0;
|
||||
cumulation.release();
|
||||
cumulation = null;
|
||||
} else if (++ numReads >= discardAfterReads) {
|
||||
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
|
||||
// See https://github.com/netty/netty/issues/4275
|
||||
numReads = 0;
|
||||
discardSomeReadBytes();
|
||||
}
|
||||
|
||||
int size = out.size();
|
||||
decodeWasNull = size == 0;
|
||||
|
||||
@ -253,6 +274,7 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
|
||||
|
||||
@Override
|
||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||
numReads = 0;
|
||||
discardSomeReadBytes();
|
||||
if (decodeWasNull) {
|
||||
decodeWasNull = false;
|
||||
|
Loading…
Reference in New Issue
Block a user