From 29d34c672c1cd64c153add03a5340574fdcb7cfe Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 24 Sep 2013 07:49:26 +0200 Subject: [PATCH] [#1855] Try to calculate the correct amount of written bytes to update the ChannelProgressiveFuture --- .../handler/stream/ChunkedWriteHandler.java | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java index 7d226afe50..ed03a06fc6 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java @@ -15,6 +15,8 @@ */ package io.netty.handler.stream; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufHolder; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; @@ -264,6 +266,7 @@ public class ChunkedWriteHandler message = Unpooled.EMPTY_BUFFER; } + final int amount = amount(message); pendingWrites.incrementAndGet(); ChannelFuture f = ctx.write(message); if (endOfInput) { @@ -278,7 +281,7 @@ public class ChunkedWriteHandler @Override public void operationComplete(ChannelFuture future) throws Exception { pendingWrites.decrementAndGet(); - currentWrite.progress(); + currentWrite.progress(amount); currentWrite.success(); closeInput(chunks); } @@ -292,7 +295,7 @@ public class ChunkedWriteHandler closeInput((ChunkedInput) pendingMessage); currentWrite.fail(future.cause()); } else { - currentWrite.progress(); + currentWrite.progress(amount); } } }); @@ -305,7 +308,7 @@ public class ChunkedWriteHandler closeInput((ChunkedInput) pendingMessage); currentWrite.fail(future.cause()); } else { - currentWrite.progress(); + currentWrite.progress(amount); if (isWritable()) { resumeTransfer(); } @@ -363,11 +366,21 @@ public class ChunkedWriteHandler promise.setSuccess(); } - void progress() { - progress ++; + void progress(int amount) { + progress += amount; if (promise instanceof ChannelProgressivePromise) { ((ChannelProgressivePromise) promise).tryProgress(progress, -1); } } } + + private static int amount(Object msg) { + if (msg instanceof ByteBuf) { + return ((ByteBuf) msg).readableBytes(); + } + if (msg instanceof ByteBufHolder) { + return ((ByteBufHolder) msg).content().readableBytes(); + } + return 1; + } }