Fix NPE in ByteToMessageDecoder if the user removes the handler while channelInputClosed(...) is processing the buffer. (#10817)

Motivation:

We need to carefully check for null before we pass the cumulation buffer into decodeLast as callDecode(...) may have removed the codec already and so set cumulation to null.

Modifications:

- Check for null and if we see null use Unpooled.EMPTY_BUFFEr
- Only call decodeLast(...) if callDecode(...) didnt remove the handler yet.

Result:

Fixes https://github.com/netty/netty/issues/10802
This commit is contained in:
Norman Maurer 2020-11-24 14:08:32 +01:00
parent aab4c0c78a
commit 7d53b97c0f
2 changed files with 38 additions and 2 deletions

View File

@ -336,6 +336,7 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
@Override @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
if (evt instanceof ChannelInputShutdownEvent) { if (evt instanceof ChannelInputShutdownEvent) {
// The decodeLast method is invoked when a channelInactive event is encountered. // The decodeLast method is invoked when a channelInactive event is encountered.
// This method is responsible for ending requests in some situations and must be called // This method is responsible for ending requests in some situations and must be called
@ -343,7 +344,6 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
assert context.ctx == ctx || ctx == context; assert context.ctx == ctx || ctx == context;
channelInputClosed(context, false); channelInputClosed(context, false);
} }
ctx.fireUserEventTriggered(evt);
} }
private void channelInputClosed(ByteToMessageDecoderContext ctx, boolean callChannelInactive) { private void channelInputClosed(ByteToMessageDecoderContext ctx, boolean callChannelInactive) {
@ -376,7 +376,14 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
void channelInputClosed(ByteToMessageDecoderContext ctx) throws Exception { void channelInputClosed(ByteToMessageDecoderContext ctx) throws Exception {
if (cumulation != null) { if (cumulation != null) {
callDecode(ctx, cumulation); callDecode(ctx, cumulation);
decodeLast(ctx, cumulation); // If callDecode(...) removed the handle from the pipeline we should not call decodeLast(...) as this would
// be unexpected.
if (!ctx.isRemoved()) {
// Use Unpooled.EMPTY_BUFFER if cumulation become null after calling callDecode(...).
// See https://github.com/netty/netty/issues/10802.
ByteBuf buffer = cumulation == null ? Unpooled.EMPTY_BUFFER : cumulation;
decodeLast(ctx, buffer);
}
} else { } else {
decodeLast(ctx, Unpooled.EMPTY_BUFFER); decodeLast(ctx, Unpooled.EMPTY_BUFFER);
} }

View File

@ -25,11 +25,14 @@ import io.netty.buffer.UnpooledHeapByteBuf;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import org.junit.Test; import org.junit.Test;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import static io.netty.buffer.Unpooled.wrappedBuffer; import static io.netty.buffer.Unpooled.wrappedBuffer;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -507,4 +510,30 @@ public class ByteToMessageDecoderTest {
assertTrue(buffer5.release()); assertTrue(buffer5.release());
assertFalse(channel.finish()); assertFalse(channel.finish());
} }
@Test
public void testDecodeLast() {
final AtomicBoolean removeHandler = new AtomicBoolean();
EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in) {
if (removeHandler.get()) {
ctx.pipeline().remove(this);
}
}
});
byte[] bytes = new byte[1024];
ThreadLocalRandom.current().nextBytes(bytes);
assertFalse(channel.writeInbound(Unpooled.copiedBuffer(bytes)));
assertNull(channel.readInbound());
removeHandler.set(true);
// This should trigger channelInputClosed(...)
channel.pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
assertTrue(channel.finish());
assertBuffer(Unpooled.wrappedBuffer(bytes), channel.readInbound());
assertNull(channel.readInbound());
}
} }