Allow ChunkedInput to provide the progress of its transfer

Related issue: #2741 and #2151

Motivation:

There is no way for ChunkedWriteHandler to know the progress of the
transfer of a ChannelInput. Therefore, ChannelProgressiveFutureListener
cannot get exact information about the progress of the transfer.

If you add a few methods that optionally provides the transfer progress
to ChannelInput, it becomes possible for ChunkedWriteHandler to notify
ChannelProgressiveFutureListeners.

If the input has no definite length, we can still use the progress so
far, and consider the length of the input as 'undefined'.

Modifications:

- Add ChunkedInput.progress() and ChunkedInput.length()
- Modify ChunkedWriteHandler to use progress() and length() to notify
  the transfer progress

Result:

ChunkedWriteHandler now notifies ChannelProgressiveFutureListener.
This commit is contained in:
plucury 2014-08-13 22:52:24 +08:00 committed by Trustin Lee
parent d92875402d
commit a2416481e3
9 changed files with 108 additions and 24 deletions

View File

@ -96,4 +96,14 @@ public class HttpChunkedInput implements ChunkedInput<HttpContent> {
return new DefaultHttpContent(buf); return new DefaultHttpContent(buf);
} }
} }
@Override
public long length() {
return input.length();
}
@Override
public long progress() {
return input.progress();
}
} }

View File

@ -246,11 +246,14 @@ public class HttpPostRequestEncoder implements ChunkedInput<HttpContent> {
* While adding a FileUpload, is the multipart currently in Mixed Mode * While adding a FileUpload, is the multipart currently in Mixed Mode
*/ */
private boolean duringMixedMode; private boolean duringMixedMode;
/** /**
* Global Body size * Global Body size
*/ */
private long globalBodySize; private long globalBodySize;
/**
* Global Transfer progress
*/
private long globalProgress;
/** /**
* True if this request is a Multipart request * True if this request is a Multipart request
@ -999,7 +1002,9 @@ public class HttpPostRequestEncoder implements ChunkedInput<HttpContent> {
if (isLastChunkSent) { if (isLastChunkSent) {
return null; return null;
} else { } else {
return nextChunk(); HttpContent nextChunk = nextChunk();
globalProgress += nextChunk.content().readableBytes();
return nextChunk;
} }
} }
@ -1085,6 +1090,16 @@ public class HttpPostRequestEncoder implements ChunkedInput<HttpContent> {
return isLastChunkSent; return isLastChunkSent;
} }
@Override
public long length() {
return isMultipart? globalBodySize : globalBodySize - 1;
}
@Override
public long progress() {
return globalProgress;
}
/** /**
* Exception when an error occurs while encoding * Exception when an error occurs while encoding
*/ */

View File

@ -161,4 +161,14 @@ public class ChunkedFile implements ChunkedInput<ByteBuf> {
} }
} }
} }
@Override
public long length() {
return endOffset - startOffset;
}
@Override
public long progress() {
return offset - startOffset;
}
} }

View File

@ -47,4 +47,16 @@ public interface ChunkedInput<B> {
*/ */
B readChunk(ChannelHandlerContext ctx) throws Exception; B readChunk(ChannelHandlerContext ctx) throws Exception;
/**
* Returns the length of the input.
* @return the length of the input if the length of the input is known.
* a negative value if the length of the input is unknown.
*/
long length();
/**
* Returns current transfer progress.
*/
long progress();
} }

View File

@ -172,4 +172,14 @@ public class ChunkedNioFile implements ChunkedInput<ByteBuf> {
} }
} }
} }
@Override
public long length() {
return endOffset - startOffset;
}
@Override
public long progress() {
return offset - startOffset;
}
} }

View File

@ -128,4 +128,14 @@ public class ChunkedNioStream implements ChunkedInput<ByteBuf> {
} }
} }
} }
@Override
public long length() {
return -1;
}
@Override
public long progress() {
return offset;
}
} }

View File

@ -120,4 +120,14 @@ public class ChunkedStream implements ChunkedInput<ByteBuf> {
} }
} }
} }
@Override
public long length() {
return -1;
}
@Override
public long progress() {
return offset;
}
} }

View File

@ -179,7 +179,7 @@ public class ChunkedWriteHandler extends ChannelHandlerAdapter {
} }
currentWrite.fail(cause); currentWrite.fail(cause);
} else { } else {
currentWrite.success(); currentWrite.success(in.length());
} }
closeInput(in); closeInput(in);
} catch (Exception e) { } catch (Exception e) {
@ -253,7 +253,6 @@ public class ChunkedWriteHandler extends ChannelHandlerAdapter {
message = Unpooled.EMPTY_BUFFER; message = Unpooled.EMPTY_BUFFER;
} }
final int amount = amount(message);
ChannelFuture f = ctx.write(message); ChannelFuture f = ctx.write(message);
if (endOfInput) { if (endOfInput) {
this.currentWrite = null; this.currentWrite = null;
@ -266,8 +265,8 @@ public class ChunkedWriteHandler extends ChannelHandlerAdapter {
f.addListener(new ChannelFutureListener() { f.addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
currentWrite.progress(amount); currentWrite.progress(chunks.progress(), chunks.length());
currentWrite.success(); currentWrite.success(chunks.length());
closeInput(chunks); closeInput(chunks);
} }
}); });
@ -279,7 +278,7 @@ public class ChunkedWriteHandler extends ChannelHandlerAdapter {
closeInput((ChunkedInput<?>) pendingMessage); closeInput((ChunkedInput<?>) pendingMessage);
currentWrite.fail(future.cause()); currentWrite.fail(future.cause());
} else { } else {
currentWrite.progress(amount); currentWrite.progress(chunks.progress(), chunks.length());
} }
} }
}); });
@ -291,7 +290,7 @@ public class ChunkedWriteHandler extends ChannelHandlerAdapter {
closeInput((ChunkedInput<?>) pendingMessage); closeInput((ChunkedInput<?>) pendingMessage);
currentWrite.fail(future.cause()); currentWrite.fail(future.cause());
} else { } else {
currentWrite.progress(amount); currentWrite.progress(chunks.progress(), chunks.length());
if (channel.isWritable()) { if (channel.isWritable()) {
resumeTransfer(); resumeTransfer();
} }
@ -327,7 +326,6 @@ public class ChunkedWriteHandler extends ChannelHandlerAdapter {
private static final class PendingWrite { private static final class PendingWrite {
final Object msg; final Object msg;
final ChannelPromise promise; final ChannelPromise promise;
private long progress;
PendingWrite(Object msg, ChannelPromise promise) { PendingWrite(Object msg, ChannelPromise promise) {
this.msg = msg; this.msg = msg;
@ -339,7 +337,7 @@ public class ChunkedWriteHandler extends ChannelHandlerAdapter {
promise.tryFailure(cause); promise.tryFailure(cause);
} }
void success() { void success(long total) {
if (promise.isDone()) { if (promise.isDone()) {
// No need to notify the progress or fulfill the promise because it's done already. // No need to notify the progress or fulfill the promise because it's done already.
return; return;
@ -347,27 +345,16 @@ public class ChunkedWriteHandler extends ChannelHandlerAdapter {
if (promise instanceof ChannelProgressivePromise) { if (promise instanceof ChannelProgressivePromise) {
// Now we know what the total is. // Now we know what the total is.
((ChannelProgressivePromise) promise).tryProgress(progress, progress); ((ChannelProgressivePromise) promise).tryProgress(total, total);
} }
promise.trySuccess(); promise.trySuccess();
} }
void progress(int amount) { void progress(long progress, long total) {
progress += amount;
if (promise instanceof ChannelProgressivePromise) { if (promise instanceof ChannelProgressivePromise) {
((ChannelProgressivePromise) promise).tryProgress(progress, -1); ((ChannelProgressivePromise) promise).tryProgress(progress, total);
} }
} }
} }
private static int amount(Object msg) {
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
}
if (msg instanceof ByteBufHolder) {
return ((ByteBufHolder) msg).content().readableBytes();
}
return 1;
}
} }

View File

@ -124,6 +124,16 @@ public class ChunkedWriteHandlerTest {
done = true; done = true;
return buffer.duplicate().retain(); return buffer.duplicate().retain();
} }
@Override
public long length() {
return -1;
}
@Override
public long progress() {
return 1;
}
}; };
final AtomicBoolean listenerNotified = new AtomicBoolean(false); final AtomicBoolean listenerNotified = new AtomicBoolean(false);
@ -171,6 +181,16 @@ public class ChunkedWriteHandlerTest {
done = true; done = true;
return 0; return 0;
} }
@Override
public long length() {
return -1;
}
@Override
public long progress() {
return 1;
}
}; };
EmbeddedChannel ch = new EmbeddedChannel(new ChunkedWriteHandler()); EmbeddedChannel ch = new EmbeddedChannel(new ChunkedWriteHandler());