diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkedInput.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkedInput.java index a6d317a1ff..652ba6ae6f 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkedInput.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkedInput.java @@ -96,4 +96,14 @@ public class HttpChunkedInput implements ChunkedInput { return new DefaultHttpContent(buf); } } + + @Override + public long length() { + return input.length(); + } + + @Override + public long progress() { + return input.progress(); + } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/multipart/HttpPostRequestEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/multipart/HttpPostRequestEncoder.java index e984b5f597..044acf449d 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/multipart/HttpPostRequestEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/multipart/HttpPostRequestEncoder.java @@ -244,11 +244,14 @@ public class HttpPostRequestEncoder implements ChunkedInput { * While adding a FileUpload, is the multipart currently in Mixed Mode */ private boolean duringMixedMode; - /** * Global Body size */ private long globalBodySize; + /** + * Global Transfer progress + */ + private long globalProgress; /** * True if this request is a Multipart request @@ -997,7 +1000,9 @@ public class HttpPostRequestEncoder implements ChunkedInput { if (isLastChunkSent) { return null; } else { - return nextChunk(); + HttpContent nextChunk = nextChunk(); + globalProgress += nextChunk.content().readableBytes(); + return nextChunk; } } @@ -1083,6 +1088,16 @@ public class HttpPostRequestEncoder implements ChunkedInput { return isLastChunkSent; } + @Override + public long length() { + return isMultipart? globalBodySize : globalBodySize - 1; + } + + @Override + public long progress() { + return globalProgress; + } + /** * Exception when an error occurs while encoding */ diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedFile.java b/handler/src/main/java/io/netty/handler/stream/ChunkedFile.java index aaa7c7bf96..69d1efb141 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedFile.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedFile.java @@ -161,4 +161,14 @@ public class ChunkedFile implements ChunkedInput { } } } + + @Override + public long length() { + return endOffset - startOffset; + } + + @Override + public long progress() { + return offset - startOffset; + } } diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedInput.java b/handler/src/main/java/io/netty/handler/stream/ChunkedInput.java index fa6fd85322..4c44bf9efc 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedInput.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedInput.java @@ -47,4 +47,16 @@ public interface ChunkedInput { */ B readChunk(ChannelHandlerContext ctx) throws Exception; + /** + * Returns the length of the input. + * @return the length of the input if the length of the input is known. + * a negative value if the length of the input is unknown. + */ + long length(); + + /** + * Returns current transfer progress. + */ + long progress(); + } diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedNioFile.java b/handler/src/main/java/io/netty/handler/stream/ChunkedNioFile.java index 6032644c09..dbb0521d4d 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedNioFile.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedNioFile.java @@ -172,4 +172,14 @@ public class ChunkedNioFile implements ChunkedInput { } } } + + @Override + public long length() { + return endOffset - startOffset; + } + + @Override + public long progress() { + return offset - startOffset; + } } diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedNioStream.java b/handler/src/main/java/io/netty/handler/stream/ChunkedNioStream.java index f6dcc754ba..fd59e74477 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedNioStream.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedNioStream.java @@ -128,4 +128,14 @@ public class ChunkedNioStream implements ChunkedInput { } } } + + @Override + public long length() { + return -1; + } + + @Override + public long progress() { + return offset; + } } diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedStream.java b/handler/src/main/java/io/netty/handler/stream/ChunkedStream.java index e50d4fbc57..bcaa633453 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedStream.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedStream.java @@ -120,4 +120,14 @@ public class ChunkedStream implements ChunkedInput { } } } + + @Override + public long length() { + return -1; + } + + @Override + public long progress() { + return offset; + } } diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java index 9f8ba6b176..39bd22b109 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java @@ -179,7 +179,7 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler { } currentWrite.fail(cause); } else { - currentWrite.success(); + currentWrite.success(in.length()); } closeInput(in); } catch (Exception e) { @@ -253,7 +253,6 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler { message = Unpooled.EMPTY_BUFFER; } - final int amount = amount(message); ChannelFuture f = ctx.write(message); if (endOfInput) { this.currentWrite = null; @@ -266,8 +265,8 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler { f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - currentWrite.progress(amount); - currentWrite.success(); + currentWrite.progress(chunks.progress(), chunks.length()); + currentWrite.success(chunks.length()); closeInput(chunks); } }); @@ -279,7 +278,7 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler { closeInput((ChunkedInput) pendingMessage); currentWrite.fail(future.cause()); } else { - currentWrite.progress(amount); + currentWrite.progress(chunks.progress(), chunks.length()); } } }); @@ -291,7 +290,7 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler { closeInput((ChunkedInput) pendingMessage); currentWrite.fail(future.cause()); } else { - currentWrite.progress(amount); + currentWrite.progress(chunks.progress(), chunks.length()); if (channel.isWritable()) { resumeTransfer(); } @@ -327,7 +326,6 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler { private static final class PendingWrite { final Object msg; final ChannelPromise promise; - private long progress; PendingWrite(Object msg, ChannelPromise promise) { this.msg = msg; @@ -339,7 +337,7 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler { promise.tryFailure(cause); } - void success() { + void success(long total) { if (promise.isDone()) { // No need to notify the progress or fulfill the promise because it's done already. return; @@ -347,27 +345,16 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler { if (promise instanceof ChannelProgressivePromise) { // Now we know what the total is. - ((ChannelProgressivePromise) promise).tryProgress(progress, progress); + ((ChannelProgressivePromise) promise).tryProgress(total, total); } promise.trySuccess(); } - void progress(int amount) { - progress += amount; + void progress(long progress, long total) { if (promise instanceof ChannelProgressivePromise) { - ((ChannelProgressivePromise) promise).tryProgress(progress, -1); + ((ChannelProgressivePromise) promise).tryProgress(progress, total); } } } - - private static int amount(Object msg) { - if (msg instanceof ByteBuf) { - return ((ByteBuf) msg).readableBytes(); - } - if (msg instanceof ByteBufHolder) { - return ((ByteBufHolder) msg).content().readableBytes(); - } - return 1; - } } diff --git a/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java b/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java index 5487992938..204e140f4b 100644 --- a/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java @@ -124,6 +124,16 @@ public class ChunkedWriteHandlerTest { done = true; return buffer.duplicate().retain(); } + + @Override + public long length() { + return -1; + } + + @Override + public long progress() { + return 1; + } }; final AtomicBoolean listenerNotified = new AtomicBoolean(false); @@ -171,6 +181,16 @@ public class ChunkedWriteHandlerTest { done = true; return 0; } + + @Override + public long length() { + return -1; + } + + @Override + public long progress() { + return 1; + } }; EmbeddedChannel ch = new EmbeddedChannel(new ChunkedWriteHandler());