diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index 340f2ce32f..5ce3350871 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -121,55 +121,62 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - if (msg instanceof ByteBuf) { - RecyclableArrayList out = RecyclableArrayList.newInstance(); - try { + RecyclableArrayList out = RecyclableArrayList.newInstance(); + try { + if (msg instanceof ByteBuf) { ByteBuf data = (ByteBuf) msg; if (cumulation == null) { cumulation = data; + try { + callDecode(ctx, cumulation, out); + } finally { + if (cumulation != null && !cumulation.isReadable()) { + cumulation.release(); + cumulation = null; + } + } } else { - if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) { - expandCumulation(ctx, data.readableBytes()); - } - cumulation.writeBytes(data); - data.release(); - } - callDecode(ctx, cumulation, out); - } catch (DecoderException e) { - throw e; - } catch (Throwable t) { - throw new DecoderException(t); - } finally { - if (cumulation != null) { - if (!cumulation.isReadable()) { - cumulation.release(); - cumulation = null; - } else { - cumulation.discardSomeReadBytes(); + try { + if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) { + ByteBuf oldCumulation = cumulation; + cumulation = ctx.alloc().buffer(oldCumulation.readableBytes() + data.readableBytes()); + cumulation.writeBytes(oldCumulation); + oldCumulation.release(); + } + cumulation.writeBytes(data); + callDecode(ctx, cumulation, out); + } finally { + if (cumulation != null) { + if (!cumulation.isReadable()) { + cumulation.release(); + cumulation = null; + } else { + cumulation.discardSomeReadBytes(); + } + } + data.release(); } } - int size = out.size(); - if (size == 0) { - decodeWasNull = true; - } else { - for (int i = 0; i < size; i ++) { - ctx.fireChannelRead(out.get(i)); - } - } - out.recycle(); + } else { + out.add(msg); } - } else { - ctx.fireChannelRead(msg); + } catch (DecoderException e) { + throw e; + } catch (Throwable t) { + throw new DecoderException(t); + } finally { + int size = out.size(); + if (size == 0) { + decodeWasNull = true; + } else { + for (int i = 0; i < size; i ++) { + ctx.fireChannelRead(out.get(i)); + } + } + out.recycle(); } } - private void expandCumulation(ChannelHandlerContext ctx, int readable) { - ByteBuf oldCumulation = cumulation; - cumulation = ctx.alloc().buffer(oldCumulation.readableBytes() + readable); - cumulation.writeBytes(oldCumulation); - oldCumulation.release(); - } - @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { if (decodeWasNull) {