Correctly handle ChannelInputShutdownEvent in ReplayingDecoder
Motivation:
b112673554
added ChannelInputShutdownEvent support to ByteToMessageDecoder but missed updating the code for ReplayingDecoder. This has the effect:
- If a ChannelInputShutdownEvent is fired ByteToMessageDecoder (the super-class of ReplayingDecoder) will call the channelInputClosed(...) method which will pass the incorrect buffer to the decode method of ReplayingDecoder.
Modifications:
Share more code between ByteToMessageDEcoder and ReplayingDecoder and so also support ChannelInputShutdownEvent correctly in ReplayingDecoder
Result:
ChannelInputShutdownEvent is corrrectly handle in ReplayingDecoder as well.
This commit is contained in:
parent
7c734fcf73
commit
6108b7297b
@ -323,12 +323,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
||||
private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) throws Exception {
|
||||
RecyclableArrayList out = RecyclableArrayList.newInstance();
|
||||
try {
|
||||
if (cumulation != null) {
|
||||
callDecode(ctx, cumulation, out);
|
||||
decodeLast(ctx, cumulation, out);
|
||||
} else {
|
||||
decodeLast(ctx, Unpooled.EMPTY_BUFFER, out);
|
||||
}
|
||||
channelInputClosed(ctx, out);
|
||||
} catch (DecoderException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
@ -355,6 +350,19 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when the input of the channel was closed which may be because it changed to inactive or because of
|
||||
* {@link ChannelInputShutdownEvent}.
|
||||
*/
|
||||
void channelInputClosed(ChannelHandlerContext ctx, List<Object> out) throws Exception {
|
||||
if (cumulation != null) {
|
||||
callDecode(ctx, cumulation, out);
|
||||
decodeLast(ctx, cumulation, out);
|
||||
} else {
|
||||
decodeLast(ctx, Unpooled.EMPTY_BUFFER, out);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called once data should be decoded from the given {@link ByteBuf}. This method will call
|
||||
* {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
|
||||
|
@ -16,6 +16,7 @@
|
||||
package io.netty.handler.codec;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
@ -322,37 +323,19 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
RecyclableArrayList out = RecyclableArrayList.newInstance();
|
||||
final void channelInputClosed(ChannelHandlerContext ctx, List<Object> out) throws Exception {
|
||||
try {
|
||||
replayable.terminate();
|
||||
if (cumulation != null) {
|
||||
callDecode(ctx, internalBuffer(), out);
|
||||
decodeLast(ctx, replayable, out);
|
||||
} else {
|
||||
replayable.setCumulation(Unpooled.EMPTY_BUFFER);
|
||||
decodeLast(ctx, replayable, out);
|
||||
}
|
||||
} catch (Signal replay) {
|
||||
// Ignore
|
||||
replay.expect(REPLAY);
|
||||
} catch (DecoderException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
throw new DecoderException(e);
|
||||
} finally {
|
||||
try {
|
||||
if (cumulation != null) {
|
||||
cumulation.release();
|
||||
cumulation = null;
|
||||
}
|
||||
|
||||
int size = out.size();
|
||||
if (size > 0) {
|
||||
fireChannelRead(ctx, out, size);
|
||||
// Something was read, call fireChannelReadComplete()
|
||||
ctx.fireChannelReadComplete();
|
||||
}
|
||||
ctx.fireChannelInactive();
|
||||
} finally {
|
||||
// recycle in all cases
|
||||
out.recycle();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -20,11 +20,13 @@ import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.channel.embedded.EmbeddedChannel;
|
||||
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static io.netty.util.ReferenceCountUtil.*;
|
||||
import static org.junit.Assert.*;
|
||||
@ -225,4 +227,39 @@ public class ReplayingDecoderTest {
|
||||
assertEquals(3, (int) queue.take());
|
||||
assertTrue(queue.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChannelInputShutdownEvent() {
|
||||
final AtomicReference<Error> error = new AtomicReference<Error>();
|
||||
|
||||
EmbeddedChannel channel = new EmbeddedChannel(new ReplayingDecoder<Integer>(0) {
|
||||
private boolean decoded;
|
||||
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
||||
if (!(in instanceof ReplayingDecoderByteBuf)) {
|
||||
error.set(new AssertionError("in must be of type " + ReplayingDecoderByteBuf.class
|
||||
+ " but was " + in.getClass()));
|
||||
return;
|
||||
}
|
||||
if (!decoded) {
|
||||
decoded = true;
|
||||
in.readByte();
|
||||
state(1);
|
||||
} else {
|
||||
// This will throw an ReplayingError
|
||||
in.skipBytes(Integer.MAX_VALUE);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
assertFalse(channel.writeInbound(Unpooled.wrappedBuffer(new byte[] {0, 1})));
|
||||
channel.pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
|
||||
assertFalse(channel.finishAndReleaseAll());
|
||||
|
||||
Error err = error.get();
|
||||
if (err != null) {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user