From 70ea670ca5f9d605729faeae7e92ef3ee7e051c1 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 21 Jan 2020 15:34:29 -0800 Subject: [PATCH] =?UTF-8?q?Reduce=20allocations=20in=20ChunkedWriteHandler?= =?UTF-8?q?=20when=20processing=20the=20queued=20=E2=80=A6=20(#9960)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Motivation: At the moment we create a new ChannelFutureListener per chunk when trying to write these to the underlying transport. This can be optimized by replacing the seperate write and flush call with writeAndFlush and only allocate the listener if the future is not complete yet. Modifications: - Replace seperate write and flush calls with writeAndFlush - Only create listener if needed, otherwise execute directly Result: Less allocations --- .../handler/stream/ChunkedWriteHandler.java | 89 ++++++++++--------- 1 file changed, 47 insertions(+), 42 deletions(-) diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java index bf8da55c20..4994198bb5 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java @@ -19,6 +19,7 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; @@ -253,53 +254,29 @@ public class ChunkedWriteHandler implements ChannelHandler { message = Unpooled.EMPTY_BUFFER; } - ChannelFuture f = ctx.write(message); + // Flush each chunk to conserve memory + ChannelFuture f = ctx.writeAndFlush(message); if (endOfInput) { queue.remove(); - // Register a listener which will close the input once the write is complete. - // This is needed because the Chunk may have some resource bound that can not - // be closed before its not written. - // - // See https://github.com/netty/netty/issues/303 - - f.addListener(future -> { - if (!future.isSuccess()) { - closeInput(chunks); - currentWrite.fail(future.cause()); - } else { - // read state of the input in local variables before closing it - long inputProgress = chunks.progress(); - long inputLength = chunks.length(); - closeInput(chunks); - currentWrite.progress(inputProgress, inputLength); - currentWrite.success(inputLength); - } - }); - } else if (channel.isWritable()) { - f.addListener(future -> { - if (!future.isSuccess()) { - closeInput(chunks); - currentWrite.fail(future.cause()); - } else { - currentWrite.progress(chunks.progress(), chunks.length()); - } - }); + if (f.isDone()) { + handleEndOfInputFuture(f, currentWrite); + } else { + // Register a listener which will close the input once the write is complete. + // This is needed because the Chunk may have some resource bound that can not + // be closed before its not written. + // + // See https://github.com/netty/netty/issues/303 + f.addListener((ChannelFutureListener) future -> handleEndOfInputFuture(future, currentWrite)); + } } else { - f.addListener(future -> { - if (!future.isSuccess()) { - closeInput(chunks); - currentWrite.fail(future.cause()); - } else { - currentWrite.progress(chunks.progress(), chunks.length()); - if (channel.isWritable()) { - resumeTransfer(); - } - } - }); + final boolean resume = !channel.isWritable(); + if (f.isDone()) { + handleFuture(f, currentWrite, resume); + } else { + f.addListener((ChannelFutureListener) future -> handleFuture(future, currentWrite, resume)); + } } - // Flush each chunk to conserve memory - ctx.flush(); requiresFlush = false; } else { queue.remove(); @@ -318,6 +295,34 @@ public class ChunkedWriteHandler implements ChannelHandler { } } + private static void handleEndOfInputFuture(ChannelFuture future, PendingWrite currentWrite) { + ChunkedInput input = (ChunkedInput) currentWrite.msg; + if (!future.isSuccess()) { + closeInput(input); + currentWrite.fail(future.cause()); + } else { + // read state of the input in local variables before closing it + long inputProgress = input.progress(); + long inputLength = input.length(); + closeInput(input); + currentWrite.progress(inputProgress, inputLength); + currentWrite.success(inputLength); + } + } + + private void handleFuture(ChannelFuture future, PendingWrite currentWrite, boolean resume) { + ChunkedInput input = (ChunkedInput) currentWrite.msg; + if (!future.isSuccess()) { + closeInput(input); + currentWrite.fail(future.cause()); + } else { + currentWrite.progress(input.progress(), input.length()); + if (resume && future.channel().isWritable()) { + resumeTransfer(); + } + } + } + private static void closeInput(ChunkedInput chunks) { try { chunks.close();