From f96a8e595171a5ed75a05ea16da07647de54ca64 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Fri, 19 Jul 2013 12:53:23 +0900 Subject: [PATCH] Implement ProgressivePromise notification in NIO byte channels and ChunkedWriteHandler - Refine the contract of GenericProgressiveFutureListener. - Negative 'total' now means 'unknown', which is useful for ChunkedWriteHandler. --- .../util/concurrent/DefaultProgressivePromise.java | 8 +++++++- .../GenericProgressiveFutureListener.java | 7 +++++++ .../http/file/HttpStaticFileServerHandler.java | 14 +++++++++++++- .../netty/handler/stream/ChunkedWriteHandler.java | 14 ++++++++++++++ .../netty/channel/nio/AbstractNioByteChannel.java | 7 +++++++ .../netty/channel/socket/nio/NioSocketChannel.java | 1 + 6 files changed, 49 insertions(+), 2 deletions(-) diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultProgressivePromise.java b/common/src/main/java/io/netty/util/concurrent/DefaultProgressivePromise.java index 177e08edb4..d3c32a5386 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultProgressivePromise.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultProgressivePromise.java @@ -34,7 +34,13 @@ public class DefaultProgressivePromise extends DefaultPromise implements P @Override public ProgressivePromise setProgress(long progress, long total) { - if (progress < 0 || progress > total) { + if (total < 0) { + // total unknown + total = -1; // normalize + if (progress < 0) { + throw new IllegalArgumentException("progress: " + progress + " (expected: >= 0)"); + } + } else if (progress < 0 || progress > total) { throw new IllegalArgumentException( "progress: " + progress + " (expected: 0 <= progress <= total (" + total + "))"); } diff --git a/common/src/main/java/io/netty/util/concurrent/GenericProgressiveFutureListener.java b/common/src/main/java/io/netty/util/concurrent/GenericProgressiveFutureListener.java index 17fef13ec2..d1a5c13c76 100644 --- a/common/src/main/java/io/netty/util/concurrent/GenericProgressiveFutureListener.java +++ b/common/src/main/java/io/netty/util/concurrent/GenericProgressiveFutureListener.java @@ -17,5 +17,12 @@ package io.netty.util.concurrent; public interface GenericProgressiveFutureListener> extends GenericFutureListener { + /** + * Invoked when the operation has progressed. + * + * @param progress the progress of the operation so far (cumulative) + * @param total the number that signifies the end of the operation when {@code progress} reaches at it. + * {@code -1} if the end of operation is unknown. + */ void operationProgressed(F future, long progress, long total) throws Exception; } diff --git a/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java b/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java index c14418948f..9e85fa0f98 100644 --- a/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java +++ b/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java @@ -20,6 +20,8 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelProgressiveFuture; +import io.netty.channel.ChannelProgressiveFutureListener; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.DefaultHttpResponse; @@ -180,7 +182,17 @@ public class HttpStaticFileServerHandler extends SimpleChannelInboundHandler) pendingMessage); currentWrite.fail(future.cause()); + } else { + currentWrite.progress(); } } }); @@ -294,7 +297,10 @@ public class ChunkedWriteHandler closeInput((ChunkedInput) pendingMessage); currentWrite.fail(future.cause()); } else if (isWritable()) { + currentWrite.progress(); resumeTransfer(); + } else { + currentWrite.progress(); } } }); @@ -327,6 +333,7 @@ public class ChunkedWriteHandler private static final class PendingWrite { final Object msg; final ChannelPromise promise; + private long progress; PendingWrite(Object msg, ChannelPromise promise) { this.msg = msg; @@ -339,5 +346,12 @@ public class ChunkedWriteHandler promise.setFailure(cause); } } + + void progress() { + progress ++; + if (promise instanceof ChannelProgressivePromise) { + ((ChannelProgressivePromise) promise).setProgress(progress, -1); + } + } } } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index 6b73caac04..6533fd6916 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -162,12 +162,14 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { } boolean done = false; + long flushedAmount = 0; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount == 0) { break; } + flushedAmount += localFlushedAmount; if (!buf.isReadable()) { done = true; break; @@ -178,6 +180,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { in.remove(); } else { // Did not write completely. + in.progress(flushedAmount); if ((interestOps & SelectionKey.OP_WRITE) == 0) { key.interestOps(interestOps | SelectionKey.OP_WRITE); } @@ -186,11 +189,14 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { } else if (msg instanceof FileRegion) { FileRegion region = (FileRegion) msg; boolean done = false; + long flushedAmount = 0; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { long localFlushedAmount = doWriteFileRegion(region); if (localFlushedAmount == 0) { break; } + + flushedAmount += localFlushedAmount; if (region.transfered() >= region.count()) { done = true; break; @@ -201,6 +207,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { in.remove(); } else { // Did not write completely. + in.progress(flushedAmount); if ((interestOps & SelectionKey.OP_WRITE) == 0) { key.interestOps(interestOps | SelectionKey.OP_WRITE); } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index 5157ca5d3f..4531b24348 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -296,6 +296,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty writtenBytes -= readableBytes; } else if (readableBytes > writtenBytes) { buf.readerIndex(readerIndex + (int) writtenBytes); + in.progress(writtenBytes); break; } else { // readable == writtenBytes in.remove();