From 18c66a9d70e60d079f8ca1e1f265f80efbde10af Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 2 Mar 2021 21:56:59 +0100 Subject: [PATCH] Ensure removal from queue happens before writeAndFlush(...) is called (#11049) Motivation: We need to ensure that we call queue.remove() before we cal writeAndFlush() as this operation may cause an event that also touches the queue and remove from it. If we miss to do so we may see NoSuchElementExceptions. Modifications: - Call queue.remove() before calling writeAndFlush(...) - Add unit test Result: Fixes https://github.com/netty/netty/issues/11046 --- .../handler/stream/ChunkedWriteHandler.java | 7 ++- .../stream/ChunkedWriteHandlerTest.java | 49 +++++++++++++++++++ 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java index e647411fe6..b1f91ea2bc 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java @@ -261,11 +261,14 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler { message = Unpooled.EMPTY_BUFFER; } + if (endOfInput) { + // We need to remove the element from the queue before we call writeAndFlush() as this operation + // may cause an action that also touches the queue. + queue.remove(); + } // Flush each chunk to conserve memory ChannelFuture f = ctx.writeAndFlush(message); if (endOfInput) { - queue.remove(); - if (f.isDone()) { handleEndOfInputFuture(f, currentWrite); } else { diff --git a/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java b/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java index 780fec6fe2..932c3a4367 100644 --- a/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java @@ -585,6 +585,55 @@ public class ChunkedWriteHandlerTest { assertFalse(ch.finish()); } + @Test + public void testEndOfInputWhenChannelIsClosedwhenWrite() { + ChunkedInput input = new ChunkedInput() { + + @Override + public boolean isEndOfInput() { + return true; + } + + @Override + public void close() { + } + + @Deprecated + @Override + public ByteBuf readChunk(ChannelHandlerContext ctx) { + return null; + } + + @Override + public ByteBuf readChunk(ByteBufAllocator allocator) { + return null; + } + + @Override + public long length() { + return -1; + } + + @Override + public long progress() { + return 1; + } + }; + + EmbeddedChannel ch = new EmbeddedChannel(new ChannelOutboundHandlerAdapter() { + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + ReferenceCountUtil.release(msg); + // Calling close so we will drop all queued messages in the ChunkedWriteHandler. + ctx.close(); + promise.setSuccess(); + } + }, new ChunkedWriteHandler()); + + ch.writeAndFlush(input).syncUninterruptibly(); + assertFalse(ch.finishAndReleaseAll()); + } + @Test public void testWriteListenerInvokedAfterChannelClosedAndInputNotFullyConsumed() throws Exception { // use non-empty input which has endOfInput = false