diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index c5e0824f82..fd0d0202bf 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -336,6 +336,7 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + ctx.fireUserEventTriggered(evt); if (evt instanceof ChannelInputShutdownEvent) { // The decodeLast method is invoked when a channelInactive event is encountered. // This method is responsible for ending requests in some situations and must be called @@ -343,7 +344,6 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter { assert context.ctx == ctx || ctx == context; channelInputClosed(context, false); } - ctx.fireUserEventTriggered(evt); } private void channelInputClosed(ByteToMessageDecoderContext ctx, boolean callChannelInactive) { @@ -376,7 +376,14 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter { void channelInputClosed(ByteToMessageDecoderContext ctx) throws Exception { if (cumulation != null) { callDecode(ctx, cumulation); - decodeLast(ctx, cumulation); + // If callDecode(...) removed the handle from the pipeline we should not call decodeLast(...) as this would + // be unexpected. + if (!ctx.isRemoved()) { + // Use Unpooled.EMPTY_BUFFER if cumulation become null after calling callDecode(...). + // See https://github.com/netty/netty/issues/10802. + ByteBuf buffer = cumulation == null ? Unpooled.EMPTY_BUFFER : cumulation; + decodeLast(ctx, buffer); + } } else { decodeLast(ctx, Unpooled.EMPTY_BUFFER); } diff --git a/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java index b9980b17b9..0dd736677e 100644 --- a/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java @@ -25,11 +25,14 @@ import io.netty.buffer.UnpooledHeapByteBuf; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.embedded.EmbeddedChannel; + +import io.netty.channel.socket.ChannelInputShutdownEvent; import org.junit.Test; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; import static io.netty.buffer.Unpooled.wrappedBuffer; import static org.junit.Assert.assertEquals; @@ -507,4 +510,30 @@ public class ByteToMessageDecoderTest { assertTrue(buffer5.release()); assertFalse(channel.finish()); } + + @Test + public void testDecodeLast() { + final AtomicBoolean removeHandler = new AtomicBoolean(); + EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() { + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in) { + if (removeHandler.get()) { + ctx.pipeline().remove(this); + } + } + }); + byte[] bytes = new byte[1024]; + ThreadLocalRandom.current().nextBytes(bytes); + + assertFalse(channel.writeInbound(Unpooled.copiedBuffer(bytes))); + assertNull(channel.readInbound()); + removeHandler.set(true); + // This should trigger channelInputClosed(...) + channel.pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); + + assertTrue(channel.finish()); + assertBuffer(Unpooled.wrappedBuffer(bytes), channel.readInbound()); + assertNull(channel.readInbound()); + } }