Reduce allocations in ChunkedWriteHandler when processing the queued … (#9960)

Motivation:

At the moment we create a new ChannelFutureListener per chunk when trying to write these to the underlying transport. This can be optimized by replacing the seperate write and flush call with writeAndFlush and only allocate the listener if the future is not complete yet.

Modifications:

- Replace seperate write and flush calls with writeAndFlush
- Only create listener if needed, otherwise execute directly

Result:

Less allocations
This commit is contained in:
Norman Maurer 2020-01-21 15:34:29 -08:00 committed by GitHub
parent 69cd042401
commit de3b3678d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -261,61 +261,39 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler {
message = Unpooled.EMPTY_BUFFER;
}
ChannelFuture f = ctx.write(message);
// Flush each chunk to conserve memory
ChannelFuture f = ctx.writeAndFlush(message);
if (endOfInput) {
queue.remove();
// Register a listener which will close the input once the write is complete.
// This is needed because the Chunk may have some resource bound that can not
// be closed before its not written.
//
// See https://github.com/netty/netty/issues/303
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
closeInput(chunks);
currentWrite.fail(future.cause());
} else {
// read state of the input in local variables before closing it
long inputProgress = chunks.progress();
long inputLength = chunks.length();
closeInput(chunks);
currentWrite.progress(inputProgress, inputLength);
currentWrite.success(inputLength);
if (f.isDone()) {
handleEndOfInputFuture(f, currentWrite);
} else {
// Register a listener which will close the input once the write is complete.
// This is needed because the Chunk may have some resource bound that can not
// be closed before its not written.
//
// See https://github.com/netty/netty/issues/303
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
handleEndOfInputFuture(future, currentWrite);
}
}
});
} else if (channel.isWritable()) {
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
closeInput(chunks);
currentWrite.fail(future.cause());
} else {
currentWrite.progress(chunks.progress(), chunks.length());
}
}
});
});
}
} else {
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
closeInput(chunks);
currentWrite.fail(future.cause());
} else {
currentWrite.progress(chunks.progress(), chunks.length());
if (channel.isWritable()) {
resumeTransfer();
}
final boolean resume = !channel.isWritable();
if (f.isDone()) {
handleFuture(f, currentWrite, resume);
} else {
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
handleFuture(future, currentWrite, resume);
}
}
});
});
}
}
// Flush each chunk to conserve memory
ctx.flush();
requiresFlush = false;
} else {
queue.remove();
@ -334,6 +312,34 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler {
}
}
private static void handleEndOfInputFuture(ChannelFuture future, PendingWrite currentWrite) {
ChunkedInput<?> input = (ChunkedInput<?>) currentWrite.msg;
if (!future.isSuccess()) {
closeInput(input);
currentWrite.fail(future.cause());
} else {
// read state of the input in local variables before closing it
long inputProgress = input.progress();
long inputLength = input.length();
closeInput(input);
currentWrite.progress(inputProgress, inputLength);
currentWrite.success(inputLength);
}
}
private void handleFuture(ChannelFuture future, PendingWrite currentWrite, boolean resume) {
ChunkedInput<?> input = (ChunkedInput<?>) currentWrite.msg;
if (!future.isSuccess()) {
closeInput(input);
currentWrite.fail(future.cause());
} else {
currentWrite.progress(input.progress(), input.length());
if (resume && future.channel().isWritable()) {
resumeTransfer();
}
}
}
private static void closeInput(ChunkedInput<?> chunks) {
try {
chunks.close();