diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index 5ce3350871..a40bdb196c 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -49,6 +49,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter ByteBuf cumulation; private boolean singleDecode; private boolean decodeWasNull; + private boolean first; protected ByteToMessageDecoder() { if (getClass().isAnnotationPresent(Sharable.class)) { @@ -121,64 +122,57 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - RecyclableArrayList out = RecyclableArrayList.newInstance(); - try { - if (msg instanceof ByteBuf) { + if (msg instanceof ByteBuf) { + RecyclableArrayList out = RecyclableArrayList.newInstance(); + try { ByteBuf data = (ByteBuf) msg; - if (cumulation == null) { + first = cumulation == null; + if (first) { cumulation = data; - try { - callDecode(ctx, cumulation, out); - } finally { - if (cumulation != null && !cumulation.isReadable()) { - cumulation.release(); - cumulation = null; - } - } } else { - try { - if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) { - ByteBuf oldCumulation = cumulation; - cumulation = ctx.alloc().buffer(oldCumulation.readableBytes() + data.readableBytes()); - cumulation.writeBytes(oldCumulation); - oldCumulation.release(); - } - cumulation.writeBytes(data); - callDecode(ctx, cumulation, out); - } finally { - if (cumulation != null) { - if (!cumulation.isReadable()) { - cumulation.release(); - cumulation = null; - } else { - cumulation.discardSomeReadBytes(); - } - } - data.release(); + if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) { + expandCumulation(ctx, data.readableBytes()); } + cumulation.writeBytes(data); + data.release(); } - } else { - out.add(msg); - } - } catch (DecoderException e) { - throw e; - } catch (Throwable t) { - throw new DecoderException(t); - } finally { - int size = out.size(); - if (size == 0) { - decodeWasNull = true; - } else { + callDecode(ctx, cumulation, out); + } catch (DecoderException e) { + throw e; + } catch (Throwable t) { + throw new DecoderException(t); + } finally { + if (cumulation != null && !cumulation.isReadable()) { + cumulation.release(); + cumulation = null; + } + int size = out.size(); + decodeWasNull = size == 0; + for (int i = 0; i < size; i ++) { ctx.fireChannelRead(out.get(i)); } + out.recycle(); } - out.recycle(); + } else { + ctx.fireChannelRead(msg); } } + private void expandCumulation(ChannelHandlerContext ctx, int readable) { + ByteBuf oldCumulation = cumulation; + cumulation = ctx.alloc().buffer(oldCumulation.readableBytes() + readable); + cumulation.writeBytes(oldCumulation); + oldCumulation.release(); + } + @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + if (cumulation != null && !first) { + // discard some bytes if possible to make more room in the + // buffer + cumulation.discardSomeReadBytes(); + } if (decodeWasNull) { decodeWasNull = false; if (!ctx.channel().config().isAutoRead()) {