diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index 04b0fed5f7..ad971ffa7a 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -45,18 +45,23 @@ public abstract class ByteToMessageDecoder private volatile boolean singleDecode; private boolean decodeWasNull; - private MessageBuf decoderOutput; + + private static final ThreadLocal> decoderOutput = + new ThreadLocal>() { + @Override + protected MessageBuf initialValue() { + return Unpooled.messageBuffer(); + } + }; @Override public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception { - decoderOutput = Unpooled.messageBuffer(); return super.newInboundBuffer(ctx); } @Override public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception { super.freeInboundBuffer(ctx); - decoderOutput.release(); } /** @@ -130,64 +135,60 @@ public abstract class ByteToMessageDecoder protected void callDecode(ChannelHandlerContext ctx, ByteBuf in) { boolean wasNull = false; - boolean decoded = false; MessageBuf out = decoderOutput(); assert out.isEmpty(); - while (in.isReadable()) { - try { - int outSize = out.size(); - int oldInputLength = in.readableBytes(); - decode(ctx, in, out); - if (outSize == out.size()) { - wasNull = true; + try { + while (in.isReadable()) { + try { + int outSize = out.size(); + int oldInputLength = in.readableBytes(); + decode(ctx, in, out); + if (outSize == out.size()) { + wasNull = true; + if (oldInputLength == in.readableBytes()) { + break; + } else { + continue; + } + } + + wasNull = false; if (oldInputLength == in.readableBytes()) { + throw new IllegalStateException( + "decode() did not read anything but decoded a message."); + } + + if (isSingleDecode()) { break; + } + } catch (Throwable t) { + if (t instanceof CodecException) { + throw (CodecException) t; } else { - continue; + throw new DecoderException(t); } } - - wasNull = false; - if (oldInputLength == in.readableBytes()) { - throw new IllegalStateException( - "decode() did not read anything but decoded a message."); - } - - if (isSingleDecode()) { - break; - } - } catch (Throwable t) { - if (t instanceof CodecException) { - throw (CodecException) t; - } else { - throw new DecoderException(t); - } - } finally { - for (;;) { - Object msg = out.poll(); - if (msg == null) { - break; - } - decoded = true; - ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); - } - - if (decoded) { - decoded = false; - ctx.fireInboundBufferUpdated(); - } } - } + } finally { + for (;;) { + Object msg = out.poll(); + if (msg == null) { + break; + } + decoded = true; + ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); + } - if (decoded) { - decodeWasNull = false; - ctx.fireInboundBufferUpdated(); - } else { - if (wasNull) { - decodeWasNull = true; + if (decoded) { + decodeWasNull = false; + ctx.fireInboundBufferUpdated(); + } else { + if (wasNull) { + decodeWasNull = true; + } } } } @@ -217,7 +218,7 @@ public abstract class ByteToMessageDecoder } final MessageBuf decoderOutput() { - return decoderOutput; + return decoderOutput.get(); } } diff --git a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java index f5704c1e9b..a66e6f9b91 100644 --- a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java @@ -406,77 +406,75 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { @Override protected void callDecode(ChannelHandlerContext ctx, ByteBuf buf) { boolean wasNull = false; - ByteBuf in = cumulation; MessageBuf out = decoderOutput(); boolean decoded = false; - while (in.isReadable()) { - try { - int oldReaderIndex = checkpoint = in.readerIndex(); - int outSize = out.size(); - S oldState = state; + + assert out.isEmpty(); + + try { + while (in.isReadable()) { try { - decode(ctx, replayable, out); - if (outSize == out.size()) { - wasNull = true; - if (oldReaderIndex == in.readerIndex() && oldState == state) { - throw new IllegalStateException( - "null cannot be returned if no data is consumed and state didn't change."); - } else { - // Previous data has been discarded or caused state transition. - // Probably it is reading on. - continue; + int oldReaderIndex = checkpoint = in.readerIndex(); + int outSize = out.size(); + S oldState = state; + try { + decode(ctx, replayable, out); + if (outSize == out.size()) { + wasNull = true; + if (oldReaderIndex == in.readerIndex() && oldState == state) { + throw new IllegalStateException( + "null cannot be returned if no data is consumed and state didn't change."); + } else { + // Previous data has been discarded or caused state transition. + // Probably it is reading on. + continue; + } + } + } catch (Signal replay) { + replay.expect(REPLAY); + // Return to the checkpoint (or oldPosition) and retry. + int checkpoint = this.checkpoint; + if (checkpoint >= 0) { + in.readerIndex(checkpoint); + } else { + // Called by cleanup() - no need to maintain the readerIndex + // anymore because the buffer has been released already. } - } - } catch (Signal replay) { - replay.expect(REPLAY); - // Return to the checkpoint (or oldPosition) and retry. - int checkpoint = this.checkpoint; - if (checkpoint >= 0) { - in.readerIndex(checkpoint); - } else { - // Called by cleanup() - no need to maintain the readerIndex - // anymore because the buffer has been released already. - } - break; - } - wasNull = false; - - if (oldReaderIndex == in.readerIndex() && oldState == state) { - throw new IllegalStateException( - "decode() method must consume at least one byte " + - "if it returned a decoded message (caused by: " + - getClass() + ')'); - } - } catch (Throwable t) { - if (t instanceof CodecException) { - throw (CodecException) t; - } else { - throw new DecoderException(t); - } - } finally { - - for (;;) { - Object msg = out.poll(); - if (msg == null) { break; } - decoded = true; - ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); - } - if (decoded) { - decoded = false; - ctx.fireInboundBufferUpdated(); + wasNull = false; + + if (oldReaderIndex == in.readerIndex() && oldState == state) { + throw new IllegalStateException( + "decode() method must consume at least one byte " + + "if it returned a decoded message (caused by: " + + getClass() + ')'); + } + } catch (Throwable t) { + if (t instanceof CodecException) { + throw (CodecException) t; + } else { + throw new DecoderException(t); + } } } - } - - if (decoded) { - decodeWasNull = false; - ctx.fireInboundBufferUpdated(); - } else { - if (wasNull) { - decodeWasNull = true; + } finally { + for (;;) { + Object msg = out.poll(); + if (msg == null) { + break; + } + decoded = true; + ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg); + } + if (decoded) { + decodeWasNull = false; + ctx.fireInboundBufferUpdated(); + } else { + if (wasNull) { + decodeWasNull = true; + } } } }