ByteToMessageDecoder Cumulator improments (#9877)

Motivation:
ByteToMessageDecoder's default MERGE_CUMULATOR will allocate a new buffer and
copy if the refCnt() of the cumulation is > 1. However this is overly
conservative because we maybe able to avoid allocate/copy if the current
cumulation can accommodate the input buffer without a reallocation. Also when the
reallocation and copy does occur the new buffer is sized just large enough to
accommodate the current the current amount of data. If some data remains in the
cumulation after decode this will require a new allocation/copy when more data
arrives.

Modifications:
- Use maxFastWritableBytes to avoid allocation/copy if the current buffer can
  accommodate the input data without a reallocation operation.
- Use ByteBufAllocator#calculateNewCapacity(..) to get the size of the buffer
  when a reallocation/copy operation is necessary.

Result:
ByteToMessageDecoder MERGE_CUMULATOR won't allocate/copy if the cumulation
buffer can accommodate data without a reallocation, and when a reallocation
occurs we are more likely to leave additional space for future data in an effort
to reduce overall reallocations.
This commit is contained in:
Scott Mitchell 2019-12-13 09:48:25 -08:00 committed by Norman Maurer
parent ac76a24ff6
commit 33d576dbed
1 changed files with 28 additions and 30 deletions

View File

@ -31,6 +31,8 @@ import io.netty.util.internal.StringUtil;
import java.util.List;
import static java.lang.Integer.MAX_VALUE;
/**
* {@link ChannelHandler} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an
* other Message type.
@ -79,20 +81,17 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
*/
public static final Cumulator MERGE_CUMULATOR = (alloc, cumulation, in) -> {
try {
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
// Expand cumulation (by replace it) when either there is not more room in the buffer
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
// duplicate().retain() or if its read-only.
//
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
cumulation = expandCumulation(alloc, cumulation, in);
} else {
cumulation.writeBytes(in);
final int required = in.readableBytes();
if (required > cumulation.maxWritableBytes() ||
(required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) ||
cumulation.isReadOnly()) {
// Expand cumulation (by replacing it) under the following conditions:
// - cumulation cannot be resized to accommodate the additional data
// - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
// assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
return expandCumulation(alloc, cumulation, in);
}
return cumulation;
return cumulation.writeBytes(in);
} finally {
// We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
// for whatever release (for example because of OutOfMemoryError)
@ -105,8 +104,8 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
* Be aware that {@link CompositeByteBuf} use a more complex indexing implementation so depending on your use-case
* and the decoder implementation this may be slower then just use the {@link #MERGE_CUMULATOR}.
*/
public static final Cumulator COMPOSITE_CUMULATOR = (alloc, cumulation, in) -> {
ByteBuf buffer;
try {
if (cumulation.refCnt() > 1) {
// Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the
@ -115,20 +114,18 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
buffer = expandCumulation(alloc, cumulation, in);
} else {
CompositeByteBuf composite;
if (cumulation instanceof CompositeByteBuf) {
composite = (CompositeByteBuf) cumulation;
} else {
composite = alloc.compositeBuffer(Integer.MAX_VALUE);
composite.addComponent(true, cumulation);
}
composite.addComponent(true, in);
in = null;
buffer = composite;
return expandCumulation(alloc, cumulation, in);
}
return buffer;
final CompositeByteBuf composite;
if (cumulation instanceof CompositeByteBuf) {
composite = (CompositeByteBuf) cumulation;
} else {
composite = alloc.compositeBuffer(MAX_VALUE);
composite.addComponent(true, cumulation);
}
composite.addComponent(true, in);
in = null;
return composite;
} finally {
if (in != null) {
// We must release if the ownership was not transferred as otherwise it may produce a leak if
@ -358,7 +355,7 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
ctx.fireUserEventTriggered(evt);
}
private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) throws Exception {
private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) {
CodecOutputList out = CodecOutputList.newInstance();
try {
channelInputClosed(ctx, out);
@ -518,8 +515,9 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
}
}
static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) {
ByteBuf newCumulation = alloc.buffer(oldCumulation.readableBytes() + in.readableBytes());
private static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) {
ByteBuf newCumulation = alloc.buffer(alloc.calculateNewCapacity(
oldCumulation.readableBytes() + in.readableBytes(), MAX_VALUE));
ByteBuf toRelease = newCumulation;
try {
newCumulation.writeBytes(oldCumulation);