[#1855] Try to calculate the correct amount of written bytes to update the ChannelProgressiveFuture

This commit is contained in:
Norman Maurer 2013-09-24 07:49:26 +02:00
parent 5a0d7fe19a
commit 5aa2b7e9f7

View File

@ -15,6 +15,8 @@
*/
package io.netty.handler.stream;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
@ -264,6 +266,7 @@ public class ChunkedWriteHandler
message = Unpooled.EMPTY_BUFFER;
}
final int amount = amount(message);
pendingWrites.incrementAndGet();
ChannelFuture f = ctx.write(message);
if (endOfInput) {
@ -278,7 +281,7 @@ public class ChunkedWriteHandler
@Override
public void operationComplete(ChannelFuture future) throws Exception {
pendingWrites.decrementAndGet();
currentWrite.progress();
currentWrite.progress(amount);
currentWrite.success();
closeInput(chunks);
}
@ -292,7 +295,7 @@ public class ChunkedWriteHandler
closeInput((ChunkedInput<?>) pendingMessage);
currentWrite.fail(future.cause());
} else {
currentWrite.progress();
currentWrite.progress(amount);
}
}
});
@ -305,7 +308,7 @@ public class ChunkedWriteHandler
closeInput((ChunkedInput<?>) pendingMessage);
currentWrite.fail(future.cause());
} else {
currentWrite.progress();
currentWrite.progress(amount);
if (isWritable()) {
resumeTransfer();
}
@ -363,11 +366,21 @@ public class ChunkedWriteHandler
promise.setSuccess();
}
void progress() {
progress ++;
void progress(int amount) {
progress += amount;
if (promise instanceof ChannelProgressivePromise) {
((ChannelProgressivePromise) promise).tryProgress(progress, -1);
}
}
}
private static int amount(Object msg) {
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
}
if (msg instanceof ByteBufHolder) {
return ((ByteBufHolder) msg).content().readableBytes();
}
return 1;
}
}