fix remove handler cause ByteToMessageDecoder out disorder (#9670)
Motivation: Data flowing in from the decoder flows out in sequence,Whether decoder removed or not. Modification: fire data in out and clear out when hander removed before call method handlerRemoved(ctx) Result: Fixes #9668 .
This commit is contained in:
parent
95230e01da
commit
e745ef0645
@ -81,7 +81,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
|||||||
try {
|
try {
|
||||||
final ByteBuf buffer;
|
final ByteBuf buffer;
|
||||||
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|
||||||
|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
|
|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
|
||||||
// Expand cumulation (by replace it) when either there is not more room in the buffer
|
// Expand cumulation (by replace it) when either there is not more room in the buffer
|
||||||
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
|
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
|
||||||
// duplicate().retain() or if its read-only.
|
// duplicate().retain() or if its read-only.
|
||||||
@ -505,6 +505,8 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
|||||||
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
|
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
|
||||||
decodeState = STATE_INIT;
|
decodeState = STATE_INIT;
|
||||||
if (removePending) {
|
if (removePending) {
|
||||||
|
fireChannelRead(ctx, out, out.size());
|
||||||
|
out.clear();
|
||||||
handlerRemoved(ctx);
|
handlerRemoved(ctx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -399,4 +399,31 @@ public class ByteToMessageDecoderTest {
|
|||||||
}
|
}
|
||||||
assertFalse(channel.finish());
|
assertFalse(channel.finish());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDisorder() {
|
||||||
|
ByteToMessageDecoder decoder = new ByteToMessageDecoder() {
|
||||||
|
int count;
|
||||||
|
|
||||||
|
//read 4 byte then remove this decoder
|
||||||
|
@Override
|
||||||
|
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
|
||||||
|
out.add(in.readByte());
|
||||||
|
if (++count >= 4) {
|
||||||
|
ctx.pipeline().remove(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
EmbeddedChannel channel = new EmbeddedChannel(decoder);
|
||||||
|
assertTrue(channel.writeInbound(Unpooled.wrappedBuffer(new byte[]{1, 2, 3, 4, 5})));
|
||||||
|
assertEquals((byte) 1, channel.readInbound());
|
||||||
|
assertEquals((byte) 2, channel.readInbound());
|
||||||
|
assertEquals((byte) 3, channel.readInbound());
|
||||||
|
assertEquals((byte) 4, channel.readInbound());
|
||||||
|
ByteBuf buffer5 = channel.readInbound();
|
||||||
|
assertEquals((byte) 5, buffer5.readByte());
|
||||||
|
assertFalse(buffer5.isReadable());
|
||||||
|
assertTrue(buffer5.release());
|
||||||
|
assertFalse(channel.finish());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user