[#1855] Try to calculate the correct amount of written bytes to update the ChannelProgressiveFuture
This commit is contained in:
parent
ce58e76e13
commit
29d34c672c
@ -15,6 +15,8 @@
|
|||||||
*/
|
*/
|
||||||
package io.netty.handler.stream;
|
package io.netty.handler.stream;
|
||||||
|
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.ByteBufHolder;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelDuplexHandler;
|
import io.netty.channel.ChannelDuplexHandler;
|
||||||
@ -264,6 +266,7 @@ public class ChunkedWriteHandler
|
|||||||
message = Unpooled.EMPTY_BUFFER;
|
message = Unpooled.EMPTY_BUFFER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final int amount = amount(message);
|
||||||
pendingWrites.incrementAndGet();
|
pendingWrites.incrementAndGet();
|
||||||
ChannelFuture f = ctx.write(message);
|
ChannelFuture f = ctx.write(message);
|
||||||
if (endOfInput) {
|
if (endOfInput) {
|
||||||
@ -278,7 +281,7 @@ public class ChunkedWriteHandler
|
|||||||
@Override
|
@Override
|
||||||
public void operationComplete(ChannelFuture future) throws Exception {
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
pendingWrites.decrementAndGet();
|
pendingWrites.decrementAndGet();
|
||||||
currentWrite.progress();
|
currentWrite.progress(amount);
|
||||||
currentWrite.success();
|
currentWrite.success();
|
||||||
closeInput(chunks);
|
closeInput(chunks);
|
||||||
}
|
}
|
||||||
@ -292,7 +295,7 @@ public class ChunkedWriteHandler
|
|||||||
closeInput((ChunkedInput<?>) pendingMessage);
|
closeInput((ChunkedInput<?>) pendingMessage);
|
||||||
currentWrite.fail(future.cause());
|
currentWrite.fail(future.cause());
|
||||||
} else {
|
} else {
|
||||||
currentWrite.progress();
|
currentWrite.progress(amount);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -305,7 +308,7 @@ public class ChunkedWriteHandler
|
|||||||
closeInput((ChunkedInput<?>) pendingMessage);
|
closeInput((ChunkedInput<?>) pendingMessage);
|
||||||
currentWrite.fail(future.cause());
|
currentWrite.fail(future.cause());
|
||||||
} else {
|
} else {
|
||||||
currentWrite.progress();
|
currentWrite.progress(amount);
|
||||||
if (isWritable()) {
|
if (isWritable()) {
|
||||||
resumeTransfer();
|
resumeTransfer();
|
||||||
}
|
}
|
||||||
@ -363,11 +366,21 @@ public class ChunkedWriteHandler
|
|||||||
promise.setSuccess();
|
promise.setSuccess();
|
||||||
}
|
}
|
||||||
|
|
||||||
void progress() {
|
void progress(int amount) {
|
||||||
progress ++;
|
progress += amount;
|
||||||
if (promise instanceof ChannelProgressivePromise) {
|
if (promise instanceof ChannelProgressivePromise) {
|
||||||
((ChannelProgressivePromise) promise).tryProgress(progress, -1);
|
((ChannelProgressivePromise) promise).tryProgress(progress, -1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static int amount(Object msg) {
|
||||||
|
if (msg instanceof ByteBuf) {
|
||||||
|
return ((ByteBuf) msg).readableBytes();
|
||||||
|
}
|
||||||
|
if (msg instanceof ByteBufHolder) {
|
||||||
|
return ((ByteBufHolder) msg).content().readableBytes();
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user