diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index 08d613e157..7755aa7322 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -16,6 +16,8 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerAdapter; @@ -62,13 +64,75 @@ import java.util.List; * Be aware that sub-classes of {@link ByteToMessageDecoder} MUST NOT * annotated with {@link @Sharable}. *

- * Some methods such as {@link ByteBuf.readBytes(int)} will cause a memory leak if the returned buffer - * is not released or added to the out {@link List}. Use derived buffers like {@link ByteBuf.readSlice(int)} + * Some methods such as {@link ByteBuf#readBytes(int)} will cause a memory leak if the returned buffer + * is not released or added to the out {@link List}. Use derived buffers like {@link ByteBuf#readSlice(int)} * to avoid leaking memory. */ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter { + /** + * Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies. + */ + public static final Cumulator MERGE_CUMULATOR = new Cumulator() { + @Override + public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { + ByteBuf buffer; + if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() + || cumulation.refCnt() > 1) { + // Expand cumulation (by replace it) when either there is not more room in the buffer + // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or + // duplicate().retain(). + // + // See: + // - https://github.com/netty/netty/issues/2327 + // - https://github.com/netty/netty/issues/1764 + buffer = expandCumulation(alloc, cumulation, in.readableBytes()); + } else { + buffer = cumulation; + } + buffer.writeBytes(in); + in.release(); + return buffer; + } + }; + + /** + * Cumulate {@link ByteBuf}s by add them to a {@link CompositeByteBuf} and so do no memory copy whenever possible. + * Be aware that {@link CompositeByteBuf} use a more complex indexing implementation so depending on your use-case + * and the decoder implemention this may be slower then just use the {@link #MERGE_CUMULATOR}. + */ + public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() { + @Override + public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { + ByteBuf buffer; + if (cumulation.refCnt() > 1) { + // Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the user + // use slice().retain() or duplicate().retain(). + // + // See: + // - https://github.com/netty/netty/issues/2327 + // - https://github.com/netty/netty/issues/1764 + buffer = expandCumulation(alloc, cumulation, in.readableBytes()); + buffer.writeBytes(in); + in.release(); + } else { + CompositeByteBuf composite; + if (cumulation instanceof CompositeByteBuf) { + composite = (CompositeByteBuf) cumulation; + } else { + int readable = cumulation.readableBytes(); + composite = alloc.compositeBuffer(); + composite.addComponent(cumulation).writerIndex(readable); + } + composite.addComponent(in).writerIndex(composite.writerIndex() + in.readableBytes()); + buffer = composite; + } + return buffer; + } + }; + ByteBuf cumulation; + private Cumulator cumulator = MERGE_CUMULATOR; private boolean singleDecode; private boolean decodeWasNull; private boolean first; @@ -97,6 +161,16 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter { return singleDecode; } + /** + * Set the {@link Cumulator} to use for cumulate the received {@link ByteBuf}s. + */ + public void setCumulator(Cumulator cumulator) { + if (cumulator == null) { + throw new NullPointerException("cumulator"); + } + this.cumulator = cumulator; + } + /** * Returns the actual number of readable bytes in the internal cumulative * buffer of this decoder. You usually do not need to rely on this value @@ -152,19 +226,7 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter { if (first) { cumulation = data; } else { - if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes() - || cumulation.refCnt() > 1) { - // Expand cumulation (by replace it) when either there is not more room in the buffer - // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or - // duplicate().retain(). - // - // See: - // - https://github.com/netty/netty/issues/2327 - // - https://github.com/netty/netty/issues/1764 - expandCumulation(ctx, data.readableBytes()); - } - cumulation.writeBytes(data); - data.release(); + cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } callDecode(ctx, cumulation, out); } catch (DecoderException e) { @@ -189,13 +251,6 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter { } } - private void expandCumulation(ChannelHandlerContext ctx, int readable) { - ByteBuf oldCumulation = cumulation; - cumulation = ctx.alloc().buffer(oldCumulation.readableBytes() + readable); - cumulation.writeBytes(oldCumulation); - oldCumulation.release(); - } - @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { if (cumulation != null && !first && cumulation.refCnt() == 1) { @@ -323,4 +378,24 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter { protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { decode(ctx, in, out); } + + static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) { + ByteBuf oldCumulation = cumulation; + cumulation = alloc.buffer(oldCumulation.readableBytes() + readable); + cumulation.writeBytes(oldCumulation); + oldCumulation.release(); + return cumulation; + } + + /** + * Cumulate {@link ByteBuf}s. + */ + public interface Cumulator { + /** + * Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes. + * The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so + * call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed. + */ + ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in); + } } diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index 6f7d5a138b..c843d38ab6 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -434,7 +434,6 @@ public class SslHandler extends ByteToMessageDecoder { } SSLEngineResult result = wrap(alloc, engine, buf, out); - if (!buf.isReadable()) { promise = pendingUnencryptedWrites.remove(); } else {