diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index 11b9ad9f4d..d32d0f9ae1 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -402,7 +402,14 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter void channelInputClosed(ChannelHandlerContext ctx, List out) throws Exception { if (cumulation != null) { callDecode(ctx, cumulation, out); - decodeLast(ctx, cumulation, out); + // If callDecode(...) removed the handle from the pipeline we should not call decodeLast(...) as this would + // be unexpected. + if (!ctx.isRemoved()) { + // Use Unpooled.EMPTY_BUFFER if cumulation become null after calling callDecode(...). + // See https://github.com/netty/netty/issues/10802. + ByteBuf buffer = cumulation == null ? Unpooled.EMPTY_BUFFER : cumulation; + decodeLast(ctx, buffer, out); + } } else { decodeLast(ctx, Unpooled.EMPTY_BUFFER, out); } diff --git a/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java index 0d36e70435..df015a153a 100644 --- a/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java @@ -26,12 +26,14 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.util.internal.PlatformDependent; import org.junit.Test; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicBoolean; import static io.netty.buffer.Unpooled.wrappedBuffer; import static org.junit.Assert.assertEquals; @@ -510,4 +512,30 @@ public class ByteToMessageDecoderTest { assertTrue(buffer5.release()); assertFalse(channel.finish()); } + + @Test + public void testDecodeLast() { + final AtomicBoolean removeHandler = new AtomicBoolean(); + EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() { + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) { + if (removeHandler.get()) { + ctx.pipeline().remove(this); + } + } + }); + byte[] bytes = new byte[1024]; + PlatformDependent.threadLocalRandom().nextBytes(bytes); + + assertFalse(channel.writeInbound(Unpooled.copiedBuffer(bytes))); + assertNull(channel.readInbound()); + removeHandler.set(true); + // This should trigger channelInputClosed(...) + channel.pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); + + assertTrue(channel.finish()); + assertBuffer(Unpooled.wrappedBuffer(bytes), (ByteBuf) channel.readInbound()); + assertNull(channel.readInbound()); + } }