Ensure removal from queue happens before writeAndFlush(...) is called (#11049)

Motivation:

We need to ensure that we call queue.remove() before we cal writeAndFlush() as this operation may cause an event that also touches the queue and remove from it. If we miss to do so we may see NoSuchElementExceptions.

Modifications:

- Call queue.remove() before calling writeAndFlush(...)
- Add unit test

Result:

Fixes https://github.com/netty/netty/issues/11046
This commit is contained in:
Norman Maurer 2021-03-02 21:56:59 +01:00 committed by GitHub
parent 690d1a53d5
commit 18c66a9d70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 54 additions and 2 deletions

View File

@ -261,11 +261,14 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler {
message = Unpooled.EMPTY_BUFFER;
}
if (endOfInput) {
// We need to remove the element from the queue before we call writeAndFlush() as this operation
// may cause an action that also touches the queue.
queue.remove();
}
// Flush each chunk to conserve memory
ChannelFuture f = ctx.writeAndFlush(message);
if (endOfInput) {
queue.remove();
if (f.isDone()) {
handleEndOfInputFuture(f, currentWrite);
} else {

View File

@ -585,6 +585,55 @@ public class ChunkedWriteHandlerTest {
assertFalse(ch.finish());
}
@Test
public void testEndOfInputWhenChannelIsClosedwhenWrite() {
ChunkedInput<ByteBuf> input = new ChunkedInput<ByteBuf>() {
@Override
public boolean isEndOfInput() {
return true;
}
@Override
public void close() {
}
@Deprecated
@Override
public ByteBuf readChunk(ChannelHandlerContext ctx) {
return null;
}
@Override
public ByteBuf readChunk(ByteBufAllocator allocator) {
return null;
}
@Override
public long length() {
return -1;
}
@Override
public long progress() {
return 1;
}
};
EmbeddedChannel ch = new EmbeddedChannel(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ReferenceCountUtil.release(msg);
// Calling close so we will drop all queued messages in the ChunkedWriteHandler.
ctx.close();
promise.setSuccess();
}
}, new ChunkedWriteHandler());
ch.writeAndFlush(input).syncUninterruptibly();
assertFalse(ch.finishAndReleaseAll());
}
@Test
public void testWriteListenerInvokedAfterChannelClosedAndInputNotFullyConsumed() throws Exception {
// use non-empty input which has endOfInput = false