diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index a51ab57105..1c5df26c31 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -90,6 +90,11 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter { * Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies. */ public static final Cumulator MERGE_CUMULATOR = (alloc, cumulation, in) -> { + if (!cumulation.isReadable() && in.isContiguous()) { + // If cumulation is empty and input buffer is contiguous, use it directly + cumulation.release(); + return in; + } try { final int required = in.readableBytes(); if (required > cumulation.maxWritableBytes() || @@ -101,7 +106,9 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter { // assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe. return expandCumulation(alloc, cumulation, in); } - return cumulation.writeBytes(in); + cumulation.writeBytes(in, in.readerIndex(), required); + in.readerIndex(in.writerIndex()); + return cumulation; } finally { // We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw // for whatever release (for example because of OutOfMemoryError) @@ -116,31 +123,33 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter { */ public static final Cumulator COMPOSITE_CUMULATOR = (alloc, cumulation, in) -> { + if (!cumulation.isReadable()) { + cumulation.release(); + return in; + } + CompositeByteBuf composite = null; try { - if (cumulation.refCnt() > 1) { - // Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the - // user use slice().retain() or duplicate().retain(). - // - // See: - // - https://github.com/netty/netty/issues/2327 - // - https://github.com/netty/netty/issues/1764 - return expandCumulation(alloc, cumulation, in); - } - final CompositeByteBuf composite; - if (cumulation instanceof CompositeByteBuf) { + if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) { composite = (CompositeByteBuf) cumulation; + // Writer index must equal capacity if we are going to "write" + // new components to the end + if (composite.writerIndex() != composite.capacity()) { + composite.capacity(composite.writerIndex()); + } } else { - composite = alloc.compositeBuffer(MAX_VALUE); - composite.addComponent(true, cumulation); + composite = alloc.compositeBuffer(Integer.MAX_VALUE).addFlattenedComponents(true, cumulation); } - composite.addComponent(true, in); + composite.addFlattenedComponents(true, in); in = null; return composite; } finally { if (in != null) { - // We must release if the ownership was not transferred as otherwise it may produce a leak if - // writeBytes(...) throw for whatever release (for example because of OutOfMemoryError). + // We must release if the ownership was not transferred as otherwise it may produce a leak in.release(); + // Also release any new buffer allocated if we're not returning it + if (composite != null && composite != cumulation) { + composite.release(); + } } } }; diff --git a/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java index eda48af9a8..9822d31d26 100644 --- a/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java @@ -324,11 +324,11 @@ public class ByteToMessageDecoderTest { } @Override - public ByteBuf writeBytes(ByteBuf src) { + public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { if (--untilFailure <= 0) { throw error; } - return super.writeBytes(src); + return super.setBytes(index, src, srcIndex, length); } Error writeError() { @@ -339,6 +339,7 @@ public class ByteToMessageDecoderTest { @Test public void releaseWhenMergeCumulateThrows() { WriteFailingByteBuf oldCumulation = new WriteFailingByteBuf(1, 64); + oldCumulation.writeZero(1); ByteBuf in = Unpooled.buffer().writeZero(12); Throwable thrown = null; @@ -362,7 +363,7 @@ public class ByteToMessageDecoderTest { } private void releaseWhenMergeCumulateThrowsInExpand(int untilFailure, boolean shouldFail) { - ByteBuf oldCumulation = UnpooledByteBufAllocator.DEFAULT.heapBuffer(8, 8); + ByteBuf oldCumulation = UnpooledByteBufAllocator.DEFAULT.heapBuffer(8, 8).writeZero(1); final WriteFailingByteBuf newCumulation = new WriteFailingByteBuf(untilFailure, 16); ByteBufAllocator allocator = new AbstractByteBufAllocator(false) { @@ -414,7 +415,11 @@ public class ByteToMessageDecoderTest { public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) { throw error; } - }; + @Override + public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex, ByteBuf buffer) { + throw error; + } + }.writeZero(1); ByteBuf in = Unpooled.buffer().writeZero(12); try { ByteToMessageDecoder.COMPOSITE_CUMULATOR.cumulate(UnpooledByteBufAllocator.DEFAULT, cumulation, in);