From ee7027288ec5f3d688be3cee45a521a5786ccf71 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Fri, 13 Dec 2019 09:48:25 -0800 Subject: [PATCH] ByteToMessageDecoder Cumulator improments (#9877) Motivation: ByteToMessageDecoder's default MERGE_CUMULATOR will allocate a new buffer and copy if the refCnt() of the cumulation is > 1. However this is overly conservative because we maybe able to avoid allocate/copy if the current cumulation can accommodate the input buffer without a reallocation. Also when the reallocation and copy does occur the new buffer is sized just large enough to accommodate the current the current amount of data. If some data remains in the cumulation after decode this will require a new allocation/copy when more data arrives. Modifications: - Use maxFastWritableBytes to avoid allocation/copy if the current buffer can accommodate the input data without a reallocation operation. - Use ByteBufAllocator#calculateNewCapacity(..) to get the size of the buffer when a reallocation/copy operation is necessary. Result: ByteToMessageDecoder MERGE_CUMULATOR won't allocate/copy if the cumulation buffer can accommodate data without a reallocation, and when a reallocation occurs we are more likely to leave additional space for future data in an effort to reduce overall reallocations. --- .../handler/codec/ByteToMessageDecoder.java | 60 +++++++++---------- 1 file changed, 28 insertions(+), 32 deletions(-) diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index 445341838b..083b4ee1be 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -15,8 +15,6 @@ */ package io.netty.handler.codec; -import static io.netty.util.internal.ObjectUtil.checkPositive; - import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; @@ -30,6 +28,9 @@ import io.netty.util.internal.StringUtil; import java.util.List; +import static io.netty.util.internal.ObjectUtil.checkPositive; +import static java.lang.Integer.MAX_VALUE; + /** * {@link ChannelInboundHandlerAdapter} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an * other Message type. @@ -80,20 +81,17 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { try { - if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() - || cumulation.refCnt() > 1 || cumulation.isReadOnly()) { - // Expand cumulation (by replace it) when either there is not more room in the buffer - // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or - // duplicate().retain() or if its read-only. - // - // See: - // - https://github.com/netty/netty/issues/2327 - // - https://github.com/netty/netty/issues/1764 - cumulation = expandCumulation(alloc, cumulation, in); - } else { - cumulation.writeBytes(in); + final int required = in.readableBytes(); + if (required > cumulation.maxWritableBytes() || + (required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) || + cumulation.isReadOnly()) { + // Expand cumulation (by replacing it) under the following conditions: + // - cumulation cannot be resized to accommodate the additional data + // - cumulation can be expanded with a reallocation operation to accommodate but the buffer is + // assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe. + return expandCumulation(alloc, cumulation, in); } - return cumulation; + return cumulation.writeBytes(in); } finally { // We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw // for whatever release (for example because of OutOfMemoryError) @@ -110,7 +108,6 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { - ByteBuf buffer; try { if (cumulation.refCnt() > 1) { // Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the @@ -119,20 +116,18 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter // See: // - https://github.com/netty/netty/issues/2327 // - https://github.com/netty/netty/issues/1764 - buffer = expandCumulation(alloc, cumulation, in); - } else { - CompositeByteBuf composite; - if (cumulation instanceof CompositeByteBuf) { - composite = (CompositeByteBuf) cumulation; - } else { - composite = alloc.compositeBuffer(Integer.MAX_VALUE); - composite.addComponent(true, cumulation); - } - composite.addComponent(true, in); - in = null; - buffer = composite; + return expandCumulation(alloc, cumulation, in); } - return buffer; + final CompositeByteBuf composite; + if (cumulation instanceof CompositeByteBuf) { + composite = (CompositeByteBuf) cumulation; + } else { + composite = alloc.compositeBuffer(MAX_VALUE); + composite.addComponent(true, cumulation); + } + composite.addComponent(true, in); + in = null; + return composite; } finally { if (in != null) { // We must release if the ownership was not transferred as otherwise it may produce a leak if @@ -362,7 +357,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter super.userEventTriggered(ctx, evt); } - private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) throws Exception { + private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) { CodecOutputList out = CodecOutputList.newInstance(); try { channelInputClosed(ctx, out); @@ -522,8 +517,9 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter } } - static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) { - ByteBuf newCumulation = alloc.buffer(oldCumulation.readableBytes() + in.readableBytes()); + private static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) { + ByteBuf newCumulation = alloc.buffer(alloc.calculateNewCapacity( + oldCumulation.readableBytes() + in.readableBytes(), MAX_VALUE)); ByteBuf toRelease = newCumulation; try { newCumulation.writeBytes(oldCumulation);