ChunkedWriteHandler flushes too often

Motivation:

ChunkedWriteHandler queues written messages and actually writes them
when flush is called. In its doFlush method, it needs to flush after
each chunk is written to preserve memory. However, non-chunked messages
(those that aren't of type ChunkedInput) are treated in the same way,
which means that flush is called after each message is written.

Modifications:

Moved the call to flush() inside the if block that tests if the message
is an instance of ChunkedInput. To ensure flush is called at least once,
the existing boolean flushed is checked at the end of doFlush. This
check was previously in ChunkedWriteHandler.flush(), but wasn't checked in
other invocations of doFlush, e.g. in channelInactive.

Result:

When this handler is present in a pipeline, writing a series
of non-chunked messages will be flushed as the developer intended.
This commit is contained in:
Michael O'Brien 2017-03-10 16:19:45 +00:00 committed by Norman Maurer
parent 2b8c8e0805
commit 61f53c4d07
2 changed files with 20 additions and 14 deletions

View File

@ -134,10 +134,7 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler {
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
if (!doFlush(ctx)) {
// Make sure to flush at least once.
ctx.flush();
}
doFlush(ctx);
}
@Override
@ -195,14 +192,14 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler {
}
}
private boolean doFlush(final ChannelHandlerContext ctx) throws Exception {
private void doFlush(final ChannelHandlerContext ctx) throws Exception {
final Channel channel = ctx.channel();
if (!channel.isActive()) {
discard(null);
return false;
return;
}
boolean flushed = false;
boolean requiresFlush = true;
ByteBufAllocator allocator = ctx.alloc();
while (channel.isWritable()) {
if (currentWrite == null) {
@ -300,22 +297,24 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler {
}
});
}
// Flush each chunk to conserve memory
ctx.flush();
requiresFlush = false;
} else {
ctx.write(pendingMessage, currentWrite.promise);
this.currentWrite = null;
requiresFlush = true;
}
// Always need to flush
ctx.flush();
flushed = true;
if (!channel.isActive()) {
discard(new ClosedChannelException());
break;
}
}
return flushed;
if (requiresFlush) {
ctx.flush();
}
}
static void closeInput(ChunkedInput<?> chunks) {

View File

@ -98,6 +98,13 @@ public class ChunkedWriteHandlerTest {
check(new ChunkedNioFile(TMP), new ChunkedNioFile(TMP), new ChunkedNioFile(TMP));
}
@Test
public void testUnchunkedData() throws IOException {
check(Unpooled.wrappedBuffer(BYTES));
check(Unpooled.wrappedBuffer(BYTES), Unpooled.wrappedBuffer(BYTES), Unpooled.wrappedBuffer(BYTES));
}
// Test case which shows that there is not a bug like stated here:
// http://stackoverflow.com/a/10426305
@Test
@ -220,10 +227,10 @@ public class ChunkedWriteHandlerTest {
assertNull(ch.readOutbound());
}
private static void check(ChunkedInput<?>... inputs) {
private static void check(Object... inputs) {
EmbeddedChannel ch = new EmbeddedChannel(new ChunkedWriteHandler());
for (ChunkedInput<?> input: inputs) {
for (Object input: inputs) {
ch.writeOutbound(input);
}