diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java index e647411fe6..b1f91ea2bc 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java @@ -261,11 +261,14 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler { message = Unpooled.EMPTY_BUFFER; } + if (endOfInput) { + // We need to remove the element from the queue before we call writeAndFlush() as this operation + // may cause an action that also touches the queue. + queue.remove(); + } // Flush each chunk to conserve memory ChannelFuture f = ctx.writeAndFlush(message); if (endOfInput) { - queue.remove(); - if (f.isDone()) { handleEndOfInputFuture(f, currentWrite); } else { diff --git a/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java b/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java index 780fec6fe2..932c3a4367 100644 --- a/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java @@ -585,6 +585,55 @@ public class ChunkedWriteHandlerTest { assertFalse(ch.finish()); } + @Test + public void testEndOfInputWhenChannelIsClosedwhenWrite() { + ChunkedInput input = new ChunkedInput() { + + @Override + public boolean isEndOfInput() { + return true; + } + + @Override + public void close() { + } + + @Deprecated + @Override + public ByteBuf readChunk(ChannelHandlerContext ctx) { + return null; + } + + @Override + public ByteBuf readChunk(ByteBufAllocator allocator) { + return null; + } + + @Override + public long length() { + return -1; + } + + @Override + public long progress() { + return 1; + } + }; + + EmbeddedChannel ch = new EmbeddedChannel(new ChannelOutboundHandlerAdapter() { + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + ReferenceCountUtil.release(msg); + // Calling close so we will drop all queued messages in the ChunkedWriteHandler. + ctx.close(); + promise.setSuccess(); + } + }, new ChunkedWriteHandler()); + + ch.writeAndFlush(input).syncUninterruptibly(); + assertFalse(ch.finishAndReleaseAll()); + } + @Test public void testWriteListenerInvokedAfterChannelClosedAndInputNotFullyConsumed() throws Exception { // use non-empty input which has endOfInput = false