diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index 083b4ee1be..46bd380948 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -80,6 +80,11 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter public static final Cumulator MERGE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { + if (!cumulation.isReadable() && in.isContiguous()) { + // If cumulation is empty and input buffer is contiguous, use it directly + cumulation.release(); + return in; + } try { final int required = in.readableBytes(); if (required > cumulation.maxWritableBytes() || @@ -91,7 +96,9 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter // assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe. return expandCumulation(alloc, cumulation, in); } - return cumulation.writeBytes(in); + cumulation.writeBytes(in, in.readerIndex(), required); + in.readerIndex(in.writerIndex()); + return cumulation; } finally { // We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw // for whatever release (for example because of OutOfMemoryError) @@ -108,31 +115,33 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { + if (!cumulation.isReadable()) { + cumulation.release(); + return in; + } + CompositeByteBuf composite = null; try { - if (cumulation.refCnt() > 1) { - // Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the - // user use slice().retain() or duplicate().retain(). - // - // See: - // - https://github.com/netty/netty/issues/2327 - // - https://github.com/netty/netty/issues/1764 - return expandCumulation(alloc, cumulation, in); - } - final CompositeByteBuf composite; - if (cumulation instanceof CompositeByteBuf) { + if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) { composite = (CompositeByteBuf) cumulation; + // Writer index must equal capacity if we are going to "write" + // new components to the end + if (composite.writerIndex() != composite.capacity()) { + composite.capacity(composite.writerIndex()); + } } else { - composite = alloc.compositeBuffer(MAX_VALUE); - composite.addComponent(true, cumulation); + composite = alloc.compositeBuffer(Integer.MAX_VALUE).addFlattenedComponents(true, cumulation); } - composite.addComponent(true, in); + composite.addFlattenedComponents(true, in); in = null; return composite; } finally { if (in != null) { - // We must release if the ownership was not transferred as otherwise it may produce a leak if - // writeBytes(...) throw for whatever release (for example because of OutOfMemoryError). + // We must release if the ownership was not transferred as otherwise it may produce a leak in.release(); + // Also release any new buffer allocated if we're not returning it + if (composite != null && composite != cumulation) { + composite.release(); + } } } } @@ -261,13 +270,9 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter if (msg instanceof ByteBuf) { CodecOutputList out = CodecOutputList.newInstance(); try { - ByteBuf data = (ByteBuf) msg; first = cumulation == null; - if (first) { - cumulation = data; - } else { - cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); - } + cumulation = cumulator.cumulate(ctx.alloc(), + first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg); callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; @@ -517,13 +522,18 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter } } - private static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) { - ByteBuf newCumulation = alloc.buffer(alloc.calculateNewCapacity( - oldCumulation.readableBytes() + in.readableBytes(), MAX_VALUE)); + static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) { + int oldBytes = oldCumulation.readableBytes(); + int newBytes = in.readableBytes(); + int totalBytes = oldBytes + newBytes; + ByteBuf newCumulation = alloc.buffer(alloc.calculateNewCapacity(totalBytes, MAX_VALUE)); ByteBuf toRelease = newCumulation; try { - newCumulation.writeBytes(oldCumulation); - newCumulation.writeBytes(in); + // This avoids redundant checks and stack depth compared to calling writeBytes(...) + newCumulation.setBytes(0, oldCumulation, oldCumulation.readerIndex(), oldBytes) + .setBytes(oldBytes, in, in.readerIndex(), newBytes) + .writerIndex(totalBytes); + in.readerIndex(in.writerIndex()); toRelease = oldCumulation; return newCumulation; } finally { diff --git a/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java index 4279ece838..3db8022c41 100644 --- a/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java @@ -325,11 +325,11 @@ public class ByteToMessageDecoderTest { } @Override - public ByteBuf writeBytes(ByteBuf src) { + public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { if (--untilFailure <= 0) { throw error; } - return super.writeBytes(src); + return super.setBytes(index, src, srcIndex, length); } Error writeError() { @@ -340,6 +340,7 @@ public class ByteToMessageDecoderTest { @Test public void releaseWhenMergeCumulateThrows() { WriteFailingByteBuf oldCumulation = new WriteFailingByteBuf(1, 64); + oldCumulation.writeZero(1); ByteBuf in = Unpooled.buffer().writeZero(12); Throwable thrown = null; @@ -363,7 +364,7 @@ public class ByteToMessageDecoderTest { } private void releaseWhenMergeCumulateThrowsInExpand(int untilFailure, boolean shouldFail) { - ByteBuf oldCumulation = UnpooledByteBufAllocator.DEFAULT.heapBuffer(8, 8); + ByteBuf oldCumulation = UnpooledByteBufAllocator.DEFAULT.heapBuffer(8, 8).writeZero(1); final WriteFailingByteBuf newCumulation = new WriteFailingByteBuf(untilFailure, 16); ByteBufAllocator allocator = new AbstractByteBufAllocator(false) { @@ -415,7 +416,11 @@ public class ByteToMessageDecoderTest { public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) { throw error; } - }; + @Override + public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex, ByteBuf buffer) { + throw error; + } + }.writeZero(1); ByteBuf in = Unpooled.buffer().writeZero(12); try { ByteToMessageDecoder.COMPOSITE_CUMULATOR.cumulate(UnpooledByteBufAllocator.DEFAULT, cumulation, in);