Ensure ByteToMessageDecoder.Cumulator implementations always release in buffer. (#8325)
Motivation: We need to ensure the Cumulator always releases the input buffer if it can not take over the ownership of it as otherwise it may leak. Modifications: - Correctly ensure the buffer is always released. - Add unit tests. Result: Ensure buffer is always released.
This commit is contained in:
parent
ba594bcf4a
commit
c546ab20a1
@ -75,6 +75,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
||||
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
|
||||
@Override
|
||||
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
|
||||
try {
|
||||
final ByteBuf buffer;
|
||||
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|
||||
|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
|
||||
@ -90,8 +91,12 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
||||
buffer = cumulation;
|
||||
}
|
||||
buffer.writeBytes(in);
|
||||
in.release();
|
||||
return buffer;
|
||||
} finally {
|
||||
// We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
|
||||
// for whatever release (for example because of OutOfMemoryError)
|
||||
in.release();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@ -104,16 +109,16 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
||||
@Override
|
||||
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
|
||||
ByteBuf buffer;
|
||||
try {
|
||||
if (cumulation.refCnt() > 1) {
|
||||
// Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the user
|
||||
// use slice().retain() or duplicate().retain().
|
||||
// Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the
|
||||
// user use slice().retain() or duplicate().retain().
|
||||
//
|
||||
// See:
|
||||
// - https://github.com/netty/netty/issues/2327
|
||||
// - https://github.com/netty/netty/issues/1764
|
||||
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
|
||||
buffer.writeBytes(in);
|
||||
in.release();
|
||||
} else {
|
||||
CompositeByteBuf composite;
|
||||
if (cumulation instanceof CompositeByteBuf) {
|
||||
@ -123,9 +128,17 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
||||
composite.addComponent(true, cumulation);
|
||||
}
|
||||
composite.addComponent(true, in);
|
||||
in = null;
|
||||
buffer = composite;
|
||||
}
|
||||
return buffer;
|
||||
} finally {
|
||||
if (in != null) {
|
||||
// We must release if the ownership was not transfered as otherwise it may produce a leak if
|
||||
// writeBytes(...) throw for whatever release (for example because of OutOfMemoryError).
|
||||
in.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -16,7 +16,10 @@
|
||||
package io.netty.handler.codec;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.buffer.UnpooledByteBufAllocator;
|
||||
import io.netty.buffer.UnpooledHeapByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.channel.embedded.EmbeddedChannel;
|
||||
@ -27,10 +30,7 @@ import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class ByteToMessageDecoderTest {
|
||||
|
||||
@ -305,4 +305,44 @@ public class ByteToMessageDecoderTest {
|
||||
assertFalse(channel.writeInbound(Unpooled.wrappedBuffer(new byte[] { (byte) 2 })));
|
||||
assertFalse(channel.finish());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void releaseWhenMergeCumulateThrows() {
|
||||
final Error error = new Error();
|
||||
|
||||
ByteBuf cumulation = new UnpooledHeapByteBuf(UnpooledByteBufAllocator.DEFAULT, 0, 64) {
|
||||
@Override
|
||||
public ByteBuf writeBytes(ByteBuf src) {
|
||||
throw error;
|
||||
}
|
||||
};
|
||||
ByteBuf in = Unpooled.buffer().writeZero(12);
|
||||
try {
|
||||
ByteToMessageDecoder.MERGE_CUMULATOR.cumulate(UnpooledByteBufAllocator.DEFAULT, cumulation, in);
|
||||
fail();
|
||||
} catch (Error expected) {
|
||||
assertSame(error, expected);
|
||||
assertEquals(0, in.refCnt());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void releaseWhenCompositeCumulateThrows() {
|
||||
final Error error = new Error();
|
||||
|
||||
ByteBuf cumulation = new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false, 64) {
|
||||
@Override
|
||||
public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) {
|
||||
throw error;
|
||||
}
|
||||
};
|
||||
ByteBuf in = Unpooled.buffer().writeZero(12);
|
||||
try {
|
||||
ByteToMessageDecoder.COMPOSITE_CUMULATOR.cumulate(UnpooledByteBufAllocator.DEFAULT, cumulation, in);
|
||||
fail();
|
||||
} catch (Error expected) {
|
||||
assertSame(error, expected);
|
||||
assertEquals(0, in.refCnt());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user