diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index 9d52b5506e..fec73f8fc9 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -75,23 +75,28 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter public static final Cumulator MERGE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { - final ByteBuf buffer; - if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() + try { + final ByteBuf buffer; + if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() || cumulation.refCnt() > 1 || cumulation.isReadOnly()) { - // Expand cumulation (by replace it) when either there is not more room in the buffer - // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or - // duplicate().retain() or if its read-only. - // - // See: - // - https://github.com/netty/netty/issues/2327 - // - https://github.com/netty/netty/issues/1764 - buffer = expandCumulation(alloc, cumulation, in.readableBytes()); - } else { - buffer = cumulation; + // Expand cumulation (by replace it) when either there is not more room in the buffer + // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or + // duplicate().retain() or if its read-only. + // + // See: + // - https://github.com/netty/netty/issues/2327 + // - https://github.com/netty/netty/issues/1764 + buffer = expandCumulation(alloc, cumulation, in.readableBytes()); + } else { + buffer = cumulation; + } + buffer.writeBytes(in); + return buffer; + } finally { + // We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw + // for whatever release (for example because of OutOfMemoryError) + in.release(); } - buffer.writeBytes(in); - in.release(); - return buffer; } }; @@ -104,28 +109,36 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { ByteBuf buffer; - if (cumulation.refCnt() > 1) { - // Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the user - // use slice().retain() or duplicate().retain(). - // - // See: - // - https://github.com/netty/netty/issues/2327 - // - https://github.com/netty/netty/issues/1764 - buffer = expandCumulation(alloc, cumulation, in.readableBytes()); - buffer.writeBytes(in); - in.release(); - } else { - CompositeByteBuf composite; - if (cumulation instanceof CompositeByteBuf) { - composite = (CompositeByteBuf) cumulation; + try { + if (cumulation.refCnt() > 1) { + // Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the + // user use slice().retain() or duplicate().retain(). + // + // See: + // - https://github.com/netty/netty/issues/2327 + // - https://github.com/netty/netty/issues/1764 + buffer = expandCumulation(alloc, cumulation, in.readableBytes()); + buffer.writeBytes(in); } else { - composite = alloc.compositeBuffer(Integer.MAX_VALUE); - composite.addComponent(true, cumulation); + CompositeByteBuf composite; + if (cumulation instanceof CompositeByteBuf) { + composite = (CompositeByteBuf) cumulation; + } else { + composite = alloc.compositeBuffer(Integer.MAX_VALUE); + composite.addComponent(true, cumulation); + } + composite.addComponent(true, in); + in = null; + buffer = composite; + } + return buffer; + } finally { + if (in != null) { + // We must release if the ownership was not transfered as otherwise it may produce a leak if + // writeBytes(...) throw for whatever release (for example because of OutOfMemoryError). + in.release(); } - composite.addComponent(true, in); - buffer = composite; } - return buffer; } }; diff --git a/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java index 8462ee4a02..93dd2218ac 100644 --- a/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java @@ -16,7 +16,10 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.buffer.UnpooledHeapByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.embedded.EmbeddedChannel; @@ -27,10 +30,7 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; public class ByteToMessageDecoderTest { @@ -305,4 +305,44 @@ public class ByteToMessageDecoderTest { assertFalse(channel.writeInbound(Unpooled.wrappedBuffer(new byte[] { (byte) 2 }))); assertFalse(channel.finish()); } + + @Test + public void releaseWhenMergeCumulateThrows() { + final Error error = new Error(); + + ByteBuf cumulation = new UnpooledHeapByteBuf(UnpooledByteBufAllocator.DEFAULT, 0, 64) { + @Override + public ByteBuf writeBytes(ByteBuf src) { + throw error; + } + }; + ByteBuf in = Unpooled.buffer().writeZero(12); + try { + ByteToMessageDecoder.MERGE_CUMULATOR.cumulate(UnpooledByteBufAllocator.DEFAULT, cumulation, in); + fail(); + } catch (Error expected) { + assertSame(error, expected); + assertEquals(0, in.refCnt()); + } + } + + @Test + public void releaseWhenCompositeCumulateThrows() { + final Error error = new Error(); + + ByteBuf cumulation = new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false, 64) { + @Override + public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) { + throw error; + } + }; + ByteBuf in = Unpooled.buffer().writeZero(12); + try { + ByteToMessageDecoder.COMPOSITE_CUMULATOR.cumulate(UnpooledByteBufAllocator.DEFAULT, cumulation, in); + fail(); + } catch (Error expected) { + assertSame(error, expected); + assertEquals(0, in.refCnt()); + } + } }