Implement ProgressivePromise notification in NIO byte channels and ChunkedWriteHandler

- Refine the contract of GenericProgressiveFutureListener.
- Negative 'total' now means 'unknown', which is useful for ChunkedWriteHandler.
This commit is contained in:
Trustin Lee 2013-07-19 12:53:23 +09:00
parent 0653efcd75
commit f96a8e5951
6 changed files with 49 additions and 2 deletions

View File

@ -34,7 +34,13 @@ public class DefaultProgressivePromise<V> extends DefaultPromise<V> implements P
@Override
public ProgressivePromise<V> setProgress(long progress, long total) {
if (progress < 0 || progress > total) {
if (total < 0) {
// total unknown
total = -1; // normalize
if (progress < 0) {
throw new IllegalArgumentException("progress: " + progress + " (expected: >= 0)");
}
} else if (progress < 0 || progress > total) {
throw new IllegalArgumentException(
"progress: " + progress + " (expected: 0 <= progress <= total (" + total + "))");
}

View File

@ -17,5 +17,12 @@
package io.netty.util.concurrent;
public interface GenericProgressiveFutureListener<F extends ProgressiveFuture<?>> extends GenericFutureListener<F> {
/**
* Invoked when the operation has progressed.
*
* @param progress the progress of the operation so far (cumulative)
* @param total the number that signifies the end of the operation when {@code progress} reaches at it.
* {@code -1} if the end of operation is unknown.
*/
void operationProgressed(F future, long progress, long total) throws Exception;
}

View File

@ -20,6 +20,8 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelProgressiveFuture;
import io.netty.channel.ChannelProgressiveFutureListener;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpResponse;
@ -180,7 +182,17 @@ public class HttpStaticFileServerHandler extends SimpleChannelInboundHandler<Ful
// Write the initial line and the header.
ctx.write(response);
// Write the content.
ctx.write(new ChunkedFile(raf, 0, fileLength, 8192));
ctx.write(new ChunkedFile(raf, 0, fileLength, 8192), ctx.newProgressivePromise()).addListener(new ChannelProgressiveFutureListener() {
@Override
public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) throws Exception {
System.err.println("Transfer progress: " + progress);
}
@Override
public void operationComplete(ChannelProgressiveFuture future) throws Exception {
System.err.println("Transfer complete.");
}
});
// Write the end marker
ChannelFuture writeFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);

View File

@ -22,6 +22,7 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelProgressivePromise;
import io.netty.channel.ChannelPromise;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.logging.InternalLogger;
@ -282,6 +283,8 @@ public class ChunkedWriteHandler
if (!future.isSuccess()) {
closeInput((ChunkedInput<?>) pendingMessage);
currentWrite.fail(future.cause());
} else {
currentWrite.progress();
}
}
});
@ -294,7 +297,10 @@ public class ChunkedWriteHandler
closeInput((ChunkedInput<?>) pendingMessage);
currentWrite.fail(future.cause());
} else if (isWritable()) {
currentWrite.progress();
resumeTransfer();
} else {
currentWrite.progress();
}
}
});
@ -327,6 +333,7 @@ public class ChunkedWriteHandler
private static final class PendingWrite {
final Object msg;
final ChannelPromise promise;
private long progress;
PendingWrite(Object msg, ChannelPromise promise) {
this.msg = msg;
@ -339,5 +346,12 @@ public class ChunkedWriteHandler
promise.setFailure(cause);
}
}
void progress() {
progress ++;
if (promise instanceof ChannelProgressivePromise) {
((ChannelProgressivePromise) promise).setProgress(progress, -1);
}
}
}
}

View File

@ -162,12 +162,14 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
}
boolean done = false;
long flushedAmount = 0;
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
break;
}
flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
@ -178,6 +180,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
in.remove();
} else {
// Did not write completely.
in.progress(flushedAmount);
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
@ -186,11 +189,14 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
} else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
boolean done = false;
long flushedAmount = 0;
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount == 0) {
break;
}
flushedAmount += localFlushedAmount;
if (region.transfered() >= region.count()) {
done = true;
break;
@ -201,6 +207,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
in.remove();
} else {
// Did not write completely.
in.progress(flushedAmount);
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}

View File

@ -296,6 +296,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
writtenBytes -= readableBytes;
} else if (readableBytes > writtenBytes) {
buf.readerIndex(readerIndex + (int) writtenBytes);
in.progress(writtenBytes);
break;
} else { // readable == writtenBytes
in.remove();