diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index e7c8bc5149..7bbc2bb113 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -16,6 +16,8 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -61,13 +63,75 @@ import java.util.List; * Be aware that sub-classes of {@link ByteToMessageDecoder} MUST NOT * annotated with {@link @Sharable}. *
- * Some methods such as {@link ByteBuf.readBytes(int)} will cause a memory leak if the returned buffer - * is not released or added to the out {@link List}. Use derived buffers like {@link ByteBuf.readSlice(int)} + * Some methods such as {@link ByteBuf#readBytes(int)} will cause a memory leak if the returned buffer + * is not released or added to the out {@link List}. Use derived buffers like {@link ByteBuf#readSlice(int)} * to avoid leaking memory. */ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter { + /** + * Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies. + */ + public static final Cumulator MERGE_CUMULATOR = new Cumulator() { + @Override + public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { + ByteBuf buffer; + if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() + || cumulation.refCnt() > 1) { + // Expand cumulation (by replace it) when either there is not more room in the buffer + // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or + // duplicate().retain(). + // + // See: + // - https://github.com/netty/netty/issues/2327 + // - https://github.com/netty/netty/issues/1764 + buffer = expandCumulation(alloc, cumulation, in.readableBytes()); + } else { + buffer = cumulation; + } + buffer.writeBytes(in); + in.release(); + return buffer; + } + }; + + /** + * Cumulate {@link ByteBuf}s by add them to a {@link CompositeByteBuf} and so do no memory copy whenever possible. + * Be aware that {@link CompositeByteBuf} use a more complex indexing implementation so depending on your use-case + * and the decoder implemention this may be slower then just use the {@link #MERGE_CUMULATOR}. + */ + public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() { + @Override + public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { + ByteBuf buffer; + if (cumulation.refCnt() > 1) { + // Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the user + // use slice().retain() or duplicate().retain(). + // + // See: + // - https://github.com/netty/netty/issues/2327 + // - https://github.com/netty/netty/issues/1764 + buffer = expandCumulation(alloc, cumulation, in.readableBytes()); + buffer.writeBytes(in); + in.release(); + } else { + CompositeByteBuf composite; + if (cumulation instanceof CompositeByteBuf) { + composite = (CompositeByteBuf) cumulation; + } else { + int readable = cumulation.readableBytes(); + composite = alloc.compositeBuffer(); + composite.addComponent(cumulation).writerIndex(readable); + } + composite.addComponent(in).writerIndex(composite.writerIndex() + in.readableBytes()); + buffer = composite; + } + return buffer; + } + }; + ByteBuf cumulation; + private Cumulator cumulator = MERGE_CUMULATOR; private boolean singleDecode; private boolean decodeWasNull; private boolean first; @@ -96,6 +160,16 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter return singleDecode; } + /** + * Set the {@link Cumulator} to use for cumulate the received {@link ByteBuf}s. + */ + public void setCumulator(Cumulator cumulator) { + if (cumulator == null) { + throw new NullPointerException("cumulator"); + } + this.cumulator = cumulator; + } + /** * Returns the actual number of readable bytes in the internal cumulative * buffer of this decoder. You usually do not need to rely on this value @@ -151,19 +225,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter if (first) { cumulation = data; } else { - if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes() - || cumulation.refCnt() > 1) { - // Expand cumulation (by replace it) when either there is not more room in the buffer - // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or - // duplicate().retain(). - // - // See: - // - https://github.com/netty/netty/issues/2327 - // - https://github.com/netty/netty/issues/1764 - expandCumulation(ctx, data.readableBytes()); - } - cumulation.writeBytes(data); - data.release(); + cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } callDecode(ctx, cumulation, out); } catch (DecoderException e) { @@ -188,13 +250,6 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter } } - private void expandCumulation(ChannelHandlerContext ctx, int readable) { - ByteBuf oldCumulation = cumulation; - cumulation = ctx.alloc().buffer(oldCumulation.readableBytes() + readable); - cumulation.writeBytes(oldCumulation); - oldCumulation.release(); - } - @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { if (cumulation != null && !first && cumulation.refCnt() == 1) { @@ -322,4 +377,24 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List