Reduce allocations in ChunkedWriteHandler when processing the queued … (#9960)

Motivation:

At the moment we create a new ChannelFutureListener per chunk when trying to write these to the underlying transport. This can be optimized by replacing the seperate write and flush call with writeAndFlush and only allocate the listener if the future is not complete yet.

Modifications:

- Replace seperate write and flush calls with writeAndFlush
- Only create listener if needed, otherwise execute directly

Result:

Less allocations
This commit is contained in:
Norman Maurer 2020-01-21 15:34:29 -08:00
parent dce7157be9
commit 70ea670ca5
1 changed files with 47 additions and 42 deletions

View File

@ -19,6 +19,7 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
@ -253,53 +254,29 @@ public class ChunkedWriteHandler implements ChannelHandler {
message = Unpooled.EMPTY_BUFFER;
}
ChannelFuture f = ctx.write(message);
// Flush each chunk to conserve memory
ChannelFuture f = ctx.writeAndFlush(message);
if (endOfInput) {
queue.remove();
// Register a listener which will close the input once the write is complete.
// This is needed because the Chunk may have some resource bound that can not
// be closed before its not written.
//
// See https://github.com/netty/netty/issues/303
f.addListener(future -> {
if (!future.isSuccess()) {
closeInput(chunks);
currentWrite.fail(future.cause());
} else {
// read state of the input in local variables before closing it
long inputProgress = chunks.progress();
long inputLength = chunks.length();
closeInput(chunks);
currentWrite.progress(inputProgress, inputLength);
currentWrite.success(inputLength);
}
});
} else if (channel.isWritable()) {
f.addListener(future -> {
if (!future.isSuccess()) {
closeInput(chunks);
currentWrite.fail(future.cause());
} else {
currentWrite.progress(chunks.progress(), chunks.length());
}
});
if (f.isDone()) {
handleEndOfInputFuture(f, currentWrite);
} else {
// Register a listener which will close the input once the write is complete.
// This is needed because the Chunk may have some resource bound that can not
// be closed before its not written.
//
// See https://github.com/netty/netty/issues/303
f.addListener((ChannelFutureListener) future -> handleEndOfInputFuture(future, currentWrite));
}
} else {
f.addListener(future -> {
if (!future.isSuccess()) {
closeInput(chunks);
currentWrite.fail(future.cause());
} else {
currentWrite.progress(chunks.progress(), chunks.length());
if (channel.isWritable()) {
resumeTransfer();
}
}
});
final boolean resume = !channel.isWritable();
if (f.isDone()) {
handleFuture(f, currentWrite, resume);
} else {
f.addListener((ChannelFutureListener) future -> handleFuture(future, currentWrite, resume));
}
}
// Flush each chunk to conserve memory
ctx.flush();
requiresFlush = false;
} else {
queue.remove();
@ -318,6 +295,34 @@ public class ChunkedWriteHandler implements ChannelHandler {
}
}
private static void handleEndOfInputFuture(ChannelFuture future, PendingWrite currentWrite) {
ChunkedInput<?> input = (ChunkedInput<?>) currentWrite.msg;
if (!future.isSuccess()) {
closeInput(input);
currentWrite.fail(future.cause());
} else {
// read state of the input in local variables before closing it
long inputProgress = input.progress();
long inputLength = input.length();
closeInput(input);
currentWrite.progress(inputProgress, inputLength);
currentWrite.success(inputLength);
}
}
private void handleFuture(ChannelFuture future, PendingWrite currentWrite, boolean resume) {
ChunkedInput<?> input = (ChunkedInput<?>) currentWrite.msg;
if (!future.isSuccess()) {
closeInput(input);
currentWrite.fail(future.cause());
} else {
currentWrite.progress(input.progress(), input.length());
if (resume && future.channel().isWritable()) {
resumeTransfer();
}
}
}
private static void closeInput(ChunkedInput<?> chunks) {
try {
chunks.close();