diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java index 1853634d8d..c3bbae21ae 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java @@ -43,8 +43,8 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl } @Override - public ProgressivePromise newProgressivePromise(long total) { - return new DefaultProgressivePromise(this, total); + public ProgressivePromise newProgressivePromise() { + return new DefaultProgressivePromise(this); } @Override diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultProgressivePromise.java b/common/src/main/java/io/netty/util/concurrent/DefaultProgressivePromise.java index 8f0f4bb85f..d19f890928 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultProgressivePromise.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultProgressivePromise.java @@ -18,70 +18,42 @@ package io.netty.util.concurrent; public class DefaultProgressivePromise extends DefaultPromise implements ProgressivePromise { - private final long total; - private volatile long progress; - /** * Creates a new instance. * - * It is preferable to use {@link EventExecutor#newProgressivePromise(long)} to create a new progressive promise + * It is preferable to use {@link EventExecutor#newProgressivePromise()} to create a new progressive promise * * @param executor * the {@link EventExecutor} which is used to notify the promise when it progresses or it is complete */ - public DefaultProgressivePromise(EventExecutor executor, long total) { + public DefaultProgressivePromise(EventExecutor executor) { super(executor); - validateTotal(total); - this.total = total; } - protected DefaultProgressivePromise(long total) { - /* only for subclasses */ - validateTotal(total); - this.total = total; - } - - private static void validateTotal(long total) { - if (total < 0) { - throw new IllegalArgumentException("total: " + total + " (expected: >= 0)"); - } - } + protected DefaultProgressivePromise() { /* only for subclasses */ } @Override - public long progress() { - return progress; - } - - @Override - public long total() { - return total; - } - - @Override - public ProgressivePromise setProgress(long progress) { + public ProgressivePromise setProgress(long progress, long total) { if (progress < 0 || progress > total) { throw new IllegalArgumentException( - "progress: " + progress + " (expected: 0 <= progress <= " + total + ')'); + "progress: " + progress + " (expected: 0 <= progress <= total (" + total + "))"); } if (isDone()) { throw new IllegalStateException("complete already"); } - long oldProgress = this.progress; - this.progress = progress; - notifyProgressiveListeners(progress - oldProgress); + notifyProgressiveListeners(progress, total); return this; } @Override - public boolean tryProgress(long progress) { + public boolean tryProgress(long progress, long total) { if (progress < 0 || progress > total || isDone()) { return false; } - this.progress = progress; - notifyProgressiveListeners(progress); + notifyProgressiveListeners(progress, total); return true; } diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java index 7dcc4255ff..036fb6c74a 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java @@ -605,7 +605,7 @@ public class DefaultPromise extends AbstractFuture implements Promise { } @SuppressWarnings({ "unchecked", "rawtypes" }) - void notifyProgressiveListeners(final long delta) { + void notifyProgressiveListeners(final long progress, final long total) { final Object listeners = progressiveListeners(); if (listeners == null) { return; @@ -616,10 +616,11 @@ public class DefaultPromise extends AbstractFuture implements Promise { EventExecutor executor = executor(); if (executor.inEventLoop()) { if (listeners instanceof GenericProgressiveFutureListener[]) { - notifyProgressiveListeners0(self, (GenericProgressiveFutureListener[]) listeners, delta); + notifyProgressiveListeners0( + self, (GenericProgressiveFutureListener[]) listeners, progress, total); } else { notifyProgressiveListener0( - self, (GenericProgressiveFutureListener>) listeners, delta); + self, (GenericProgressiveFutureListener>) listeners, progress, total); } } else { try { @@ -629,7 +630,7 @@ public class DefaultPromise extends AbstractFuture implements Promise { executor.execute(new Runnable() { @Override public void run() { - notifyProgressiveListeners0(self, array, delta); + notifyProgressiveListeners0(self, array, progress, total); } }); } else { @@ -638,7 +639,7 @@ public class DefaultPromise extends AbstractFuture implements Promise { executor.execute(new Runnable() { @Override public void run() { - notifyProgressiveListener0(self, l, delta); + notifyProgressiveListener0(self, l, progress, total); } }); } @@ -649,20 +650,20 @@ public class DefaultPromise extends AbstractFuture implements Promise { } private static void notifyProgressiveListeners0( - ProgressiveFuture future, GenericProgressiveFutureListener[] listeners, long delta) { + ProgressiveFuture future, GenericProgressiveFutureListener[] listeners, long progress, long total) { for (GenericProgressiveFutureListener l: listeners) { if (l == null) { break; } - notifyProgressiveListener0(future, l, delta); + notifyProgressiveListener0(future, l, progress, total); } } @SuppressWarnings({ "unchecked", "rawtypes" }) private static void notifyProgressiveListener0( - ProgressiveFuture future, GenericProgressiveFutureListener l, long delta) { + ProgressiveFuture future, GenericProgressiveFutureListener l, long progress, long total) { try { - l.operationProgressed(future, delta); + l.operationProgressed(future, progress, total); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationProgressed()", t); diff --git a/common/src/main/java/io/netty/util/concurrent/EventExecutor.java b/common/src/main/java/io/netty/util/concurrent/EventExecutor.java index 660a46f7dd..5f5c3729f1 100644 --- a/common/src/main/java/io/netty/util/concurrent/EventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/EventExecutor.java @@ -54,7 +54,7 @@ public interface EventExecutor extends EventExecutorGroup { /** * Create a new {@link ProgressivePromise}. */ - ProgressivePromise newProgressivePromise(long total); + ProgressivePromise newProgressivePromise(); /** * Create a new {@link Future} which is marked as successes already. So {@link Future#isSuccess()} diff --git a/common/src/main/java/io/netty/util/concurrent/GenericProgressiveFutureListener.java b/common/src/main/java/io/netty/util/concurrent/GenericProgressiveFutureListener.java index cef1c48c7f..17fef13ec2 100644 --- a/common/src/main/java/io/netty/util/concurrent/GenericProgressiveFutureListener.java +++ b/common/src/main/java/io/netty/util/concurrent/GenericProgressiveFutureListener.java @@ -17,5 +17,5 @@ package io.netty.util.concurrent; public interface GenericProgressiveFutureListener> extends GenericFutureListener { - void operationProgressed(F future, long delta) throws Exception; + void operationProgressed(F future, long progress, long total) throws Exception; } diff --git a/common/src/main/java/io/netty/util/concurrent/ProgressiveFuture.java b/common/src/main/java/io/netty/util/concurrent/ProgressiveFuture.java index 04e87f5bbf..1c903d52de 100644 --- a/common/src/main/java/io/netty/util/concurrent/ProgressiveFuture.java +++ b/common/src/main/java/io/netty/util/concurrent/ProgressiveFuture.java @@ -21,16 +21,6 @@ package io.netty.util.concurrent; */ public interface ProgressiveFuture extends Future { - /** - * Returns the current progress of the operation as a positive long integer. - */ - long progress(); - - /** - * Returns the maximum progress of the operation that signifies the end of operation. - */ - long total(); - @Override ProgressiveFuture addListener(GenericFutureListener> listener); diff --git a/common/src/main/java/io/netty/util/concurrent/ProgressivePromise.java b/common/src/main/java/io/netty/util/concurrent/ProgressivePromise.java index 12e79a6393..1574404d2c 100644 --- a/common/src/main/java/io/netty/util/concurrent/ProgressivePromise.java +++ b/common/src/main/java/io/netty/util/concurrent/ProgressivePromise.java @@ -24,14 +24,14 @@ public interface ProgressivePromise extends Promise, ProgressiveFuture * Sets the current progress of the operation and notifies the listeners that implement * {@link GenericProgressiveFutureListener}. */ - ProgressivePromise setProgress(long progress); + ProgressivePromise setProgress(long progress, long total); /** * Tries to set the current progress of the operation and notifies the listeners that implement * {@link GenericProgressiveFutureListener}. If the operation is already complete or the progress is out of range, * this method does nothing but returning {@code false}. */ - boolean tryProgress(long progress); + boolean tryProgress(long progress, long total); @Override ProgressivePromise setSuccess(V result); diff --git a/example/src/main/java/io/netty/example/filetransfer/FileServer.java b/example/src/main/java/io/netty/example/filetransfer/FileServer.java index 812e130f44..800bd25190 100644 --- a/example/src/main/java/io/netty/example/filetransfer/FileServer.java +++ b/example/src/main/java/io/netty/example/filetransfer/FileServer.java @@ -106,15 +106,15 @@ public class FileServer { } ctx.write(file + " " + file.length() + '\n'); FileRegion region = new DefaultFileRegion(new FileInputStream(file).getChannel(), 0, file.length()); - ChannelProgressivePromise promise = ctx.newProgressivePromise(region.count()); + ChannelProgressivePromise promise = ctx.newProgressivePromise(); promise.addListener(new ChannelProgressiveFutureListener() { @Override - public void operationProgressed(ChannelProgressiveFuture f, long delta) throws Exception { - System.err.println("progress: " + f.progress() + " / " + f.total() + " (+" + delta + ')'); + public void operationProgressed(ChannelProgressiveFuture f, long progress, long total) { + System.err.println("progress: " + progress + " / " + total); } @Override - public void operationComplete(ChannelProgressiveFuture future) throws Exception { + public void operationComplete(ChannelProgressiveFuture future) { System.err.println("file transfer complete"); } }); diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 89698b756f..3ecc4dd290 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -307,8 +307,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } @Override - public ChannelProgressivePromise newProgressivePromise(long total) { - return new DefaultChannelProgressivePromise(this, total); + public ChannelProgressivePromise newProgressivePromise() { + return new DefaultChannelProgressivePromise(this); } @Override diff --git a/transport/src/main/java/io/netty/channel/ChannelProgressivePromise.java b/transport/src/main/java/io/netty/channel/ChannelProgressivePromise.java index be62a85457..4b3744bd3f 100644 --- a/transport/src/main/java/io/netty/channel/ChannelProgressivePromise.java +++ b/transport/src/main/java/io/netty/channel/ChannelProgressivePromise.java @@ -58,5 +58,5 @@ public interface ChannelProgressivePromise extends ProgressivePromise, Cha ChannelProgressivePromise setFailure(Throwable cause); @Override - ChannelProgressivePromise setProgress(long progress); + ChannelProgressivePromise setProgress(long progress, long total); } diff --git a/transport/src/main/java/io/netty/channel/ChannelPropertyAccess.java b/transport/src/main/java/io/netty/channel/ChannelPropertyAccess.java index 9c18be93c8..5059bf15de 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPropertyAccess.java +++ b/transport/src/main/java/io/netty/channel/ChannelPropertyAccess.java @@ -43,7 +43,7 @@ interface ChannelPropertyAccess { /** * Return an new {@link ChannelProgressivePromise} */ - ChannelProgressivePromise newProgressivePromise(long total); + ChannelProgressivePromise newProgressivePromise(); /** * Create a new {@link ChannelFuture} which is marked as successes already. So {@link ChannelFuture#isSuccess()} diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index e10396849d..9c695c87d2 100755 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -1558,8 +1558,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } @Override - public ChannelProgressivePromise newProgressivePromise(long total) { - return new DefaultChannelProgressivePromise(channel(), executor(), total); + public ChannelProgressivePromise newProgressivePromise() { + return new DefaultChannelProgressivePromise(channel(), executor()); } @Override diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelProgressivePromise.java b/transport/src/main/java/io/netty/channel/DefaultChannelProgressivePromise.java index 3514dd388d..a0276fc23b 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelProgressivePromise.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelProgressivePromise.java @@ -23,7 +23,7 @@ import io.netty.util.concurrent.GenericFutureListener; /** * The default {@link ChannelProgressivePromise} implementation. It is recommended to use - * {@link Channel#newProgressivePromise(long)} to create a new {@link ChannelProgressivePromise} rather than calling the + * {@link Channel#newProgressivePromise()} to create a new {@link ChannelProgressivePromise} rather than calling the * constructor explicitly. */ public class DefaultChannelProgressivePromise @@ -37,8 +37,7 @@ public class DefaultChannelProgressivePromise * @param channel * the {@link Channel} associated with this future */ - public DefaultChannelProgressivePromise(Channel channel, long total) { - super(total); + public DefaultChannelProgressivePromise(Channel channel) { this.channel = channel; } @@ -48,8 +47,8 @@ public class DefaultChannelProgressivePromise * @param channel * the {@link Channel} associated with this future */ - public DefaultChannelProgressivePromise(Channel channel, EventExecutor executor, long total) { - super(executor, total); + public DefaultChannelProgressivePromise(Channel channel, EventExecutor executor) { + super(executor); this.channel = channel; } @@ -91,8 +90,8 @@ public class DefaultChannelProgressivePromise } @Override - public ChannelProgressivePromise setProgress(long progress) { - super.setProgress(progress); + public ChannelProgressivePromise setProgress(long progress, long total) { + super.setProgress(progress, total); return this; } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index 5b0e677dee..2d333432b7 100755 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -187,8 +187,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { } else { writtenBytes += localWrittenBytes; if (promise instanceof ChannelProgressivePromise) { - final ChannelProgressivePromise pp = (ChannelProgressivePromise) promise; - pp.setProgress(pp.progress() + localWrittenBytes); + ((ChannelProgressivePromise) promise).setProgress(writtenBytes, region.count()); } if (writtenBytes >= region.count()) { region.release(); diff --git a/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java b/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java index ecf3a4cff6..0e91187108 100644 --- a/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java @@ -112,8 +112,8 @@ public abstract class OioByteStreamChannel extends AbstractOioByteChannel { if (outChannel == null) { outChannel = Channels.newChannel(os); } - long written = 0; + long written = 0; for (;;) { long localWritten = region.transferTo(outChannel, written); if (localWritten == -1) { @@ -125,7 +125,7 @@ public abstract class OioByteStreamChannel extends AbstractOioByteChannel { written += localWritten; if (promise instanceof ChannelProgressivePromise) { final ChannelProgressivePromise pp = (ChannelProgressivePromise) promise; - pp.setProgress(pp.progress() + localWritten); + pp.setProgress(written, region.count()); } if (written >= region.count()) { promise.setSuccess(); diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java index 27352f945e..e6aa84aa6c 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java @@ -564,8 +564,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne written += result; if (promise instanceof ChannelProgressivePromise) { - final ChannelProgressivePromise pp = (ChannelProgressivePromise) promise; - pp.setProgress(pp.progress() + result); + ((ChannelProgressivePromise) promise).setProgress(written, region.count()); } if (written >= region.count()) {