Use progress + total instead of delta

.. because there is sometimes a task whose total is only a rough
estimation
This commit is contained in:
Trustin Lee 2013-04-15 20:11:02 +09:00
parent e69033a4c3
commit 7ee571968c
16 changed files with 44 additions and 84 deletions

View File

@ -43,8 +43,8 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl
}
@Override
public <V> ProgressivePromise<V> newProgressivePromise(long total) {
return new DefaultProgressivePromise<V>(this, total);
public <V> ProgressivePromise<V> newProgressivePromise() {
return new DefaultProgressivePromise<V>(this);
}
@Override

View File

@ -18,70 +18,42 @@ package io.netty.util.concurrent;
public class DefaultProgressivePromise<V> extends DefaultPromise<V> implements ProgressivePromise<V> {
private final long total;
private volatile long progress;
/**
* Creates a new instance.
*
* It is preferable to use {@link EventExecutor#newProgressivePromise(long)} to create a new progressive promise
* It is preferable to use {@link EventExecutor#newProgressivePromise()} to create a new progressive promise
*
* @param executor
* the {@link EventExecutor} which is used to notify the promise when it progresses or it is complete
*/
public DefaultProgressivePromise(EventExecutor executor, long total) {
public DefaultProgressivePromise(EventExecutor executor) {
super(executor);
validateTotal(total);
this.total = total;
}
protected DefaultProgressivePromise(long total) {
/* only for subclasses */
validateTotal(total);
this.total = total;
}
private static void validateTotal(long total) {
if (total < 0) {
throw new IllegalArgumentException("total: " + total + " (expected: >= 0)");
}
}
protected DefaultProgressivePromise() { /* only for subclasses */ }
@Override
public long progress() {
return progress;
}
@Override
public long total() {
return total;
}
@Override
public ProgressivePromise<V> setProgress(long progress) {
public ProgressivePromise<V> setProgress(long progress, long total) {
if (progress < 0 || progress > total) {
throw new IllegalArgumentException(
"progress: " + progress + " (expected: 0 <= progress <= " + total + ')');
"progress: " + progress + " (expected: 0 <= progress <= total (" + total + "))");
}
if (isDone()) {
throw new IllegalStateException("complete already");
}
long oldProgress = this.progress;
this.progress = progress;
notifyProgressiveListeners(progress - oldProgress);
notifyProgressiveListeners(progress, total);
return this;
}
@Override
public boolean tryProgress(long progress) {
public boolean tryProgress(long progress, long total) {
if (progress < 0 || progress > total || isDone()) {
return false;
}
this.progress = progress;
notifyProgressiveListeners(progress);
notifyProgressiveListeners(progress, total);
return true;
}

View File

@ -605,7 +605,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
}
@SuppressWarnings({ "unchecked", "rawtypes" })
void notifyProgressiveListeners(final long delta) {
void notifyProgressiveListeners(final long progress, final long total) {
final Object listeners = progressiveListeners();
if (listeners == null) {
return;
@ -616,10 +616,11 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
if (listeners instanceof GenericProgressiveFutureListener[]) {
notifyProgressiveListeners0(self, (GenericProgressiveFutureListener<?>[]) listeners, delta);
notifyProgressiveListeners0(
self, (GenericProgressiveFutureListener<?>[]) listeners, progress, total);
} else {
notifyProgressiveListener0(
self, (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners, delta);
self, (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners, progress, total);
}
} else {
try {
@ -629,7 +630,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
executor.execute(new Runnable() {
@Override
public void run() {
notifyProgressiveListeners0(self, array, delta);
notifyProgressiveListeners0(self, array, progress, total);
}
});
} else {
@ -638,7 +639,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
executor.execute(new Runnable() {
@Override
public void run() {
notifyProgressiveListener0(self, l, delta);
notifyProgressiveListener0(self, l, progress, total);
}
});
}
@ -649,20 +650,20 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
}
private static void notifyProgressiveListeners0(
ProgressiveFuture<?> future, GenericProgressiveFutureListener<?>[] listeners, long delta) {
ProgressiveFuture<?> future, GenericProgressiveFutureListener<?>[] listeners, long progress, long total) {
for (GenericProgressiveFutureListener<?> l: listeners) {
if (l == null) {
break;
}
notifyProgressiveListener0(future, l, delta);
notifyProgressiveListener0(future, l, progress, total);
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private static void notifyProgressiveListener0(
ProgressiveFuture future, GenericProgressiveFutureListener l, long delta) {
ProgressiveFuture future, GenericProgressiveFutureListener l, long progress, long total) {
try {
l.operationProgressed(future, delta);
l.operationProgressed(future, progress, total);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationProgressed()", t);

View File

@ -54,7 +54,7 @@ public interface EventExecutor extends EventExecutorGroup {
/**
* Create a new {@link ProgressivePromise}.
*/
<V> ProgressivePromise<V> newProgressivePromise(long total);
<V> ProgressivePromise<V> newProgressivePromise();
/**
* Create a new {@link Future} which is marked as successes already. So {@link Future#isSuccess()}

View File

@ -17,5 +17,5 @@
package io.netty.util.concurrent;
public interface GenericProgressiveFutureListener<F extends ProgressiveFuture<?>> extends GenericFutureListener<F> {
void operationProgressed(F future, long delta) throws Exception;
void operationProgressed(F future, long progress, long total) throws Exception;
}

View File

@ -21,16 +21,6 @@ package io.netty.util.concurrent;
*/
public interface ProgressiveFuture<V> extends Future<V> {
/**
* Returns the current progress of the operation as a positive long integer.
*/
long progress();
/**
* Returns the maximum progress of the operation that signifies the end of operation.
*/
long total();
@Override
ProgressiveFuture<V> addListener(GenericFutureListener<? extends Future<V>> listener);

View File

@ -24,14 +24,14 @@ public interface ProgressivePromise<V> extends Promise<V>, ProgressiveFuture<V>
* Sets the current progress of the operation and notifies the listeners that implement
* {@link GenericProgressiveFutureListener}.
*/
ProgressivePromise<V> setProgress(long progress);
ProgressivePromise<V> setProgress(long progress, long total);
/**
* Tries to set the current progress of the operation and notifies the listeners that implement
* {@link GenericProgressiveFutureListener}. If the operation is already complete or the progress is out of range,
* this method does nothing but returning {@code false}.
*/
boolean tryProgress(long progress);
boolean tryProgress(long progress, long total);
@Override
ProgressivePromise<V> setSuccess(V result);

View File

@ -106,15 +106,15 @@ public class FileServer {
}
ctx.write(file + " " + file.length() + '\n');
FileRegion region = new DefaultFileRegion(new FileInputStream(file).getChannel(), 0, file.length());
ChannelProgressivePromise promise = ctx.newProgressivePromise(region.count());
ChannelProgressivePromise promise = ctx.newProgressivePromise();
promise.addListener(new ChannelProgressiveFutureListener() {
@Override
public void operationProgressed(ChannelProgressiveFuture f, long delta) throws Exception {
System.err.println("progress: " + f.progress() + " / " + f.total() + " (+" + delta + ')');
public void operationProgressed(ChannelProgressiveFuture f, long progress, long total) {
System.err.println("progress: " + progress + " / " + total);
}
@Override
public void operationComplete(ChannelProgressiveFuture future) throws Exception {
public void operationComplete(ChannelProgressiveFuture future) {
System.err.println("file transfer complete");
}
});

View File

@ -307,8 +307,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
@Override
public ChannelProgressivePromise newProgressivePromise(long total) {
return new DefaultChannelProgressivePromise(this, total);
public ChannelProgressivePromise newProgressivePromise() {
return new DefaultChannelProgressivePromise(this);
}
@Override

View File

@ -58,5 +58,5 @@ public interface ChannelProgressivePromise extends ProgressivePromise<Void>, Cha
ChannelProgressivePromise setFailure(Throwable cause);
@Override
ChannelProgressivePromise setProgress(long progress);
ChannelProgressivePromise setProgress(long progress, long total);
}

View File

@ -43,7 +43,7 @@ interface ChannelPropertyAccess {
/**
* Return an new {@link ChannelProgressivePromise}
*/
ChannelProgressivePromise newProgressivePromise(long total);
ChannelProgressivePromise newProgressivePromise();
/**
* Create a new {@link ChannelFuture} which is marked as successes already. So {@link ChannelFuture#isSuccess()}

View File

@ -1558,8 +1558,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
@Override
public ChannelProgressivePromise newProgressivePromise(long total) {
return new DefaultChannelProgressivePromise(channel(), executor(), total);
public ChannelProgressivePromise newProgressivePromise() {
return new DefaultChannelProgressivePromise(channel(), executor());
}
@Override

View File

@ -23,7 +23,7 @@ import io.netty.util.concurrent.GenericFutureListener;
/**
* The default {@link ChannelProgressivePromise} implementation. It is recommended to use
* {@link Channel#newProgressivePromise(long)} to create a new {@link ChannelProgressivePromise} rather than calling the
* {@link Channel#newProgressivePromise()} to create a new {@link ChannelProgressivePromise} rather than calling the
* constructor explicitly.
*/
public class DefaultChannelProgressivePromise
@ -37,8 +37,7 @@ public class DefaultChannelProgressivePromise
* @param channel
* the {@link Channel} associated with this future
*/
public DefaultChannelProgressivePromise(Channel channel, long total) {
super(total);
public DefaultChannelProgressivePromise(Channel channel) {
this.channel = channel;
}
@ -48,8 +47,8 @@ public class DefaultChannelProgressivePromise
* @param channel
* the {@link Channel} associated with this future
*/
public DefaultChannelProgressivePromise(Channel channel, EventExecutor executor, long total) {
super(executor, total);
public DefaultChannelProgressivePromise(Channel channel, EventExecutor executor) {
super(executor);
this.channel = channel;
}
@ -91,8 +90,8 @@ public class DefaultChannelProgressivePromise
}
@Override
public ChannelProgressivePromise setProgress(long progress) {
super.setProgress(progress);
public ChannelProgressivePromise setProgress(long progress, long total) {
super.setProgress(progress, total);
return this;
}

View File

@ -187,8 +187,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
} else {
writtenBytes += localWrittenBytes;
if (promise instanceof ChannelProgressivePromise) {
final ChannelProgressivePromise pp = (ChannelProgressivePromise) promise;
pp.setProgress(pp.progress() + localWrittenBytes);
((ChannelProgressivePromise) promise).setProgress(writtenBytes, region.count());
}
if (writtenBytes >= region.count()) {
region.release();

View File

@ -112,8 +112,8 @@ public abstract class OioByteStreamChannel extends AbstractOioByteChannel {
if (outChannel == null) {
outChannel = Channels.newChannel(os);
}
long written = 0;
long written = 0;
for (;;) {
long localWritten = region.transferTo(outChannel, written);
if (localWritten == -1) {
@ -125,7 +125,7 @@ public abstract class OioByteStreamChannel extends AbstractOioByteChannel {
written += localWritten;
if (promise instanceof ChannelProgressivePromise) {
final ChannelProgressivePromise pp = (ChannelProgressivePromise) promise;
pp.setProgress(pp.progress() + localWritten);
pp.setProgress(written, region.count());
}
if (written >= region.count()) {
promise.setSuccess();

View File

@ -564,8 +564,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
written += result;
if (promise instanceof ChannelProgressivePromise) {
final ChannelProgressivePromise pp = (ChannelProgressivePromise) promise;
pp.setProgress(pp.progress() + result);
((ChannelProgressivePromise) promise).setProgress(written, region.count());
}
if (written >= region.count()) {