Always cleanup() in ReplayingDecoder if we have received any messages at
all. See #105
This commit is contained in:
parent
4b9376d3c8
commit
0b77d89004
@ -17,6 +17,7 @@ package org.jboss.netty.handler.codec.replay;
|
|||||||
|
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
import org.jboss.netty.buffer.ChannelBuffer;
|
||||||
import org.jboss.netty.buffer.ChannelBufferFactory;
|
import org.jboss.netty.buffer.ChannelBufferFactory;
|
||||||
@ -297,6 +298,8 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
|
|||||||
|
|
||||||
private final AtomicReference<ChannelBuffer> cumulation =
|
private final AtomicReference<ChannelBuffer> cumulation =
|
||||||
new AtomicReference<ChannelBuffer>();
|
new AtomicReference<ChannelBuffer>();
|
||||||
|
private final AtomicBoolean needsCleanup =
|
||||||
|
new AtomicBoolean(false);
|
||||||
private final boolean unfold;
|
private final boolean unfold;
|
||||||
private ReplayingDecoderBuffer replayable;
|
private ReplayingDecoderBuffer replayable;
|
||||||
private T state;
|
private T state;
|
||||||
@ -438,6 +441,7 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
|
|||||||
}
|
}
|
||||||
|
|
||||||
ChannelBuffer cumulation = cumulation(ctx);
|
ChannelBuffer cumulation = cumulation(ctx);
|
||||||
|
needsCleanup.set(true);
|
||||||
cumulation.discardReadBytes();
|
cumulation.discardReadBytes();
|
||||||
cumulation.writeBytes(input);
|
cumulation.writeBytes(input);
|
||||||
callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress());
|
callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress());
|
||||||
@ -534,14 +538,14 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
|
|||||||
private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
|
private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
try {
|
try {
|
||||||
ChannelBuffer cumulation = this.cumulation.getAndSet(null);
|
if (!needsCleanup.getAndSet(false)) {
|
||||||
if (cumulation == null) {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ChannelBuffer cumulation = this.cumulation.getAndSet(null);
|
||||||
replayable.terminate();
|
replayable.terminate();
|
||||||
|
|
||||||
if (cumulation.readable()) {
|
if (cumulation != null && cumulation.readable()) {
|
||||||
// Make sure all data was read before notifying a closed channel.
|
// Make sure all data was read before notifying a closed channel.
|
||||||
callDecode(ctx, e.getChannel(), cumulation, null);
|
callDecode(ctx, e.getChannel(), cumulation, null);
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user