Made sure to clean up the cumulative buffer on channelDisconnected or channelClosed

This commit is contained in:
Trustin Lee 2009-06-04 08:49:33 +00:00
parent d4071e87ef
commit 345a5512ab
2 changed files with 27 additions and 17 deletions

View File

@ -293,8 +293,12 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception { throws Exception {
ChannelBuffer cumulation = cumulation(ctx);
try { try {
ChannelBuffer cumulation = this.cumulation.getAndSet(null);
if (cumulation == null) {
return;
}
if (cumulation.readable()) { if (cumulation.readable()) {
// Make sure all frames are read before notifying a closed channel. // Make sure all frames are read before notifying a closed channel.
callDecode(ctx, ctx.getChannel(), cumulation, null); callDecode(ctx, ctx.getChannel(), cumulation, null);

View File

@ -291,7 +291,12 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
* Stores the internal cumulative buffer's reader position. * Stores the internal cumulative buffer's reader position.
*/ */
protected void checkpoint() { protected void checkpoint() {
checkpoint = cumulation().readerIndex(); ChannelBuffer cumulation = this.cumulation.get();
if (cumulation != null) {
checkpoint = cumulation.readerIndex();
} else {
checkpoint = -1; // buffer not available (already cleaned up)
}
} }
/** /**
@ -299,8 +304,8 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
* the current decoder state. * the current decoder state.
*/ */
protected void checkpoint(T state) { protected void checkpoint(T state) {
checkpoint = cumulation().readerIndex(); checkpoint();
this.state = state; setState(state);
} }
/** /**
@ -414,7 +419,13 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
} }
} catch (ReplayError replay) { } catch (ReplayError replay) {
// Return to the checkpoint (or oldPosition) and retry. // Return to the checkpoint (or oldPosition) and retry.
cumulation.readerIndex(checkpoint); int checkpoint = this.checkpoint;
if (checkpoint >= 0) {
cumulation.readerIndex(checkpoint);
} else {
// Called by cleanup() - no need to maintain the readerIndex
// anymore because the buffer has been released already.
}
} }
if (result == null) { if (result == null) {
@ -460,9 +471,14 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception { throws Exception {
ChannelBuffer cumulation = cumulation(ctx);
replayable.terminate();
try { try {
ChannelBuffer cumulation = this.cumulation.getAndSet(null);
if (cumulation == null) {
return;
}
replayable.terminate();
if (cumulation.readable()) { if (cumulation.readable()) {
// Make sure all data was read before notifying a closed channel. // Make sure all data was read before notifying a closed channel.
callDecode(ctx, e.getChannel(), cumulation, null); callDecode(ctx, e.getChannel(), cumulation, null);
@ -482,7 +498,6 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
} }
} }
private ChannelBuffer cumulation(ChannelHandlerContext ctx) { private ChannelBuffer cumulation(ChannelHandlerContext ctx) {
ChannelBuffer buf = cumulation.get(); ChannelBuffer buf = cumulation.get();
if (buf == null) { if (buf == null) {
@ -496,13 +511,4 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
} }
return buf; return buf;
} }
private ChannelBuffer cumulation() {
ChannelBuffer cumulation = this.cumulation.get();
if (cumulation == null) {
throw new IllegalStateException(
"checkpoint() should be called in decode() only");
}
return cumulation;
}
} }