diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index 445341838b..083b4ee1be 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -15,8 +15,6 @@ */ package io.netty.handler.codec; -import static io.netty.util.internal.ObjectUtil.checkPositive; - import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; @@ -30,6 +28,9 @@ import io.netty.util.internal.StringUtil; import java.util.List; +import static io.netty.util.internal.ObjectUtil.checkPositive; +import static java.lang.Integer.MAX_VALUE; + /** * {@link ChannelInboundHandlerAdapter} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an * other Message type. @@ -80,20 +81,17 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { try { - if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() - || cumulation.refCnt() > 1 || cumulation.isReadOnly()) { - // Expand cumulation (by replace it) when either there is not more room in the buffer - // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or - // duplicate().retain() or if its read-only. - // - // See: - // - https://github.com/netty/netty/issues/2327 - // - https://github.com/netty/netty/issues/1764 - cumulation = expandCumulation(alloc, cumulation, in); - } else { - cumulation.writeBytes(in); + final int required = in.readableBytes(); + if (required > cumulation.maxWritableBytes() || + (required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) || + cumulation.isReadOnly()) { + // Expand cumulation (by replacing it) under the following conditions: + // - cumulation cannot be resized to accommodate the additional data + // - cumulation can be expanded with a reallocation operation to accommodate but the buffer is + // assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe. + return expandCumulation(alloc, cumulation, in); } - return cumulation; + return cumulation.writeBytes(in); } finally { // We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw // for whatever release (for example because of OutOfMemoryError) @@ -110,7 +108,6 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { - ByteBuf buffer; try { if (cumulation.refCnt() > 1) { // Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the @@ -119,20 +116,18 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter // See: // - https://github.com/netty/netty/issues/2327 // - https://github.com/netty/netty/issues/1764 - buffer = expandCumulation(alloc, cumulation, in); - } else { - CompositeByteBuf composite; - if (cumulation instanceof CompositeByteBuf) { - composite = (CompositeByteBuf) cumulation; - } else { - composite = alloc.compositeBuffer(Integer.MAX_VALUE); - composite.addComponent(true, cumulation); - } - composite.addComponent(true, in); - in = null; - buffer = composite; + return expandCumulation(alloc, cumulation, in); } - return buffer; + final CompositeByteBuf composite; + if (cumulation instanceof CompositeByteBuf) { + composite = (CompositeByteBuf) cumulation; + } else { + composite = alloc.compositeBuffer(MAX_VALUE); + composite.addComponent(true, cumulation); + } + composite.addComponent(true, in); + in = null; + return composite; } finally { if (in != null) { // We must release if the ownership was not transferred as otherwise it may produce a leak if @@ -362,7 +357,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter super.userEventTriggered(ctx, evt); } - private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) throws Exception { + private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) { CodecOutputList out = CodecOutputList.newInstance(); try { channelInputClosed(ctx, out); @@ -522,8 +517,9 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter } } - static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) { - ByteBuf newCumulation = alloc.buffer(oldCumulation.readableBytes() + in.readableBytes()); + private static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) { + ByteBuf newCumulation = alloc.buffer(alloc.calculateNewCapacity( + oldCumulation.readableBytes() + in.readableBytes(), MAX_VALUE)); ByteBuf toRelease = newCumulation; try { newCumulation.writeBytes(oldCumulation);