ByteToMessageDecoder Cumulator improments (#9877)

Motivation:
ByteToMessageDecoder's default MERGE_CUMULATOR will allocate a new buffer and
copy if the refCnt() of the cumulation is > 1. However this is overly
conservative because we maybe able to avoid allocate/copy if the current
cumulation can accommodate the input buffer without a reallocation. Also when the
reallocation and copy does occur the new buffer is sized just large enough to
accommodate the current the current amount of data. If some data remains in the
cumulation after decode this will require a new allocation/copy when more data
arrives.

Modifications:
- Use maxFastWritableBytes to avoid allocation/copy if the current buffer can
  accommodate the input data without a reallocation operation.
- Use ByteBufAllocator#calculateNewCapacity(..) to get the size of the buffer
  when a reallocation/copy operation is necessary.

Result:
ByteToMessageDecoder MERGE_CUMULATOR won't allocate/copy if the cumulation
buffer can accommodate data without a reallocation, and when a reallocation
occurs we are more likely to leave additional space for future data in an effort
to reduce overall reallocations.
This commit is contained in:
Scott Mitchell 2019-12-13 09:48:25 -08:00 committed by GitHub
parent e7091c04d7
commit ee7027288e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -15,8 +15,6 @@
*/
package io.netty.handler.codec;
import static io.netty.util.internal.ObjectUtil.checkPositive;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
@ -30,6 +28,9 @@ import io.netty.util.internal.StringUtil;
import java.util.List;
import static io.netty.util.internal.ObjectUtil.checkPositive;
import static java.lang.Integer.MAX_VALUE;
/**
* {@link ChannelInboundHandlerAdapter} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an
* other Message type.
@ -80,20 +81,17 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
try {
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
// Expand cumulation (by replace it) when either there is not more room in the buffer
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
// duplicate().retain() or if its read-only.
//
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
cumulation = expandCumulation(alloc, cumulation, in);
} else {
cumulation.writeBytes(in);
final int required = in.readableBytes();
if (required > cumulation.maxWritableBytes() ||
(required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) ||
cumulation.isReadOnly()) {
// Expand cumulation (by replacing it) under the following conditions:
// - cumulation cannot be resized to accommodate the additional data
// - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
// assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
return expandCumulation(alloc, cumulation, in);
}
return cumulation;
return cumulation.writeBytes(in);
} finally {
// We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
// for whatever release (for example because of OutOfMemoryError)
@ -110,7 +108,6 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
ByteBuf buffer;
try {
if (cumulation.refCnt() > 1) {
// Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the
@ -119,20 +116,18 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
buffer = expandCumulation(alloc, cumulation, in);
} else {
CompositeByteBuf composite;
if (cumulation instanceof CompositeByteBuf) {
composite = (CompositeByteBuf) cumulation;
} else {
composite = alloc.compositeBuffer(Integer.MAX_VALUE);
composite.addComponent(true, cumulation);
}
composite.addComponent(true, in);
in = null;
buffer = composite;
return expandCumulation(alloc, cumulation, in);
}
return buffer;
final CompositeByteBuf composite;
if (cumulation instanceof CompositeByteBuf) {
composite = (CompositeByteBuf) cumulation;
} else {
composite = alloc.compositeBuffer(MAX_VALUE);
composite.addComponent(true, cumulation);
}
composite.addComponent(true, in);
in = null;
return composite;
} finally {
if (in != null) {
// We must release if the ownership was not transferred as otherwise it may produce a leak if
@ -362,7 +357,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
super.userEventTriggered(ctx, evt);
}
private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) throws Exception {
private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) {
CodecOutputList out = CodecOutputList.newInstance();
try {
channelInputClosed(ctx, out);
@ -522,8 +517,9 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
}
}
static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) {
ByteBuf newCumulation = alloc.buffer(oldCumulation.readableBytes() + in.readableBytes());
private static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) {
ByteBuf newCumulation = alloc.buffer(alloc.calculateNewCapacity(
oldCumulation.readableBytes() + in.readableBytes(), MAX_VALUE));
ByteBuf toRelease = newCumulation;
try {
newCumulation.writeBytes(oldCumulation);