diff --git a/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java b/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java index 36bd3c609a..626ca84c8f 100644 --- a/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java @@ -293,8 +293,12 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler { private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ChannelBuffer cumulation = cumulation(ctx); try { + ChannelBuffer cumulation = this.cumulation.getAndSet(null); + if (cumulation == null) { + return; + } + if (cumulation.readable()) { // Make sure all frames are read before notifying a closed channel. callDecode(ctx, ctx.getChannel(), cumulation, null); diff --git a/src/main/java/org/jboss/netty/handler/codec/replay/ReplayingDecoder.java b/src/main/java/org/jboss/netty/handler/codec/replay/ReplayingDecoder.java index 5f8d2f4f86..382b61ad95 100644 --- a/src/main/java/org/jboss/netty/handler/codec/replay/ReplayingDecoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/replay/ReplayingDecoder.java @@ -291,7 +291,12 @@ public abstract class ReplayingDecoder> * Stores the internal cumulative buffer's reader position. */ protected void checkpoint() { - checkpoint = cumulation().readerIndex(); + ChannelBuffer cumulation = this.cumulation.get(); + if (cumulation != null) { + checkpoint = cumulation.readerIndex(); + } else { + checkpoint = -1; // buffer not available (already cleaned up) + } } /** @@ -299,8 +304,8 @@ public abstract class ReplayingDecoder> * the current decoder state. */ protected void checkpoint(T state) { - checkpoint = cumulation().readerIndex(); - this.state = state; + checkpoint(); + setState(state); } /** @@ -414,7 +419,13 @@ public abstract class ReplayingDecoder> } } catch (ReplayError replay) { // Return to the checkpoint (or oldPosition) and retry. - cumulation.readerIndex(checkpoint); + int checkpoint = this.checkpoint; + if (checkpoint >= 0) { + cumulation.readerIndex(checkpoint); + } else { + // Called by cleanup() - no need to maintain the readerIndex + // anymore because the buffer has been released already. + } } if (result == null) { @@ -460,9 +471,14 @@ public abstract class ReplayingDecoder> private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - ChannelBuffer cumulation = cumulation(ctx); - replayable.terminate(); try { + ChannelBuffer cumulation = this.cumulation.getAndSet(null); + if (cumulation == null) { + return; + } + + replayable.terminate(); + if (cumulation.readable()) { // Make sure all data was read before notifying a closed channel. callDecode(ctx, e.getChannel(), cumulation, null); @@ -482,7 +498,6 @@ public abstract class ReplayingDecoder> } } - private ChannelBuffer cumulation(ChannelHandlerContext ctx) { ChannelBuffer buf = cumulation.get(); if (buf == null) { @@ -496,13 +511,4 @@ public abstract class ReplayingDecoder> } return buf; } - - private ChannelBuffer cumulation() { - ChannelBuffer cumulation = this.cumulation.get(); - if (cumulation == null) { - throw new IllegalStateException( - "checkpoint() should be called in decode() only"); - } - return cumulation; - } }