Implement cancellation properly for Promise/Future

- Related issue: #1432
- Add Future.isCancellable()
- Add Promise.setUncancellable() which is meant to be used for the party that runs the task uncancellable once started
- Implement Future.isCancelled() and Promise.cancel(boolean) properly
This commit is contained in:
Trustin Lee 2013-06-11 17:46:21 +09:00
parent 85afdda3ce
commit 41af9a1eb3
8 changed files with 139 additions and 64 deletions

View File

@ -26,16 +26,6 @@ import java.util.concurrent.TimeoutException;
*/ */
public abstract class AbstractFuture<V> implements Future<V> { public abstract class AbstractFuture<V> implements Future<V> {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override @Override
public V get() throws InterruptedException, ExecutionException { public V get() throws InterruptedException, ExecutionException {
await(); await();

View File

@ -126,4 +126,19 @@ public abstract class CompleteFuture<V> extends AbstractFuture<V> {
public boolean isDone() { public boolean isDone() {
return true; return true;
} }
@Override
public boolean isCancellable() {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
} }

View File

@ -20,6 +20,7 @@ import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.*; import static java.util.concurrent.TimeUnit.*;
@ -38,6 +39,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
} }
}; };
private static final Signal SUCCESS = new Signal(DefaultPromise.class.getName() + ".SUCCESS"); private static final Signal SUCCESS = new Signal(DefaultPromise.class.getName() + ".SUCCESS");
private static final Signal UNCANCELLABLE = new Signal(DefaultPromise.class.getName() + ".UNCANCELLABLE");
private final EventExecutor executor; private final EventExecutor executor;
private volatile Object result; private volatile Object result;
@ -69,15 +71,33 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
return executor; return executor;
} }
@Override
public boolean isCancelled() {
return isCancelled0(result);
}
private static boolean isCancelled0(Object result) {
return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
}
@Override
public boolean isCancellable() {
return result == null;
}
@Override @Override
public boolean isDone() { public boolean isDone() {
return result != null; return isDone0(result);
}
private static boolean isDone0(Object result) {
return result != null && result != UNCANCELLABLE;
} }
@Override @Override
public boolean isSuccess() { public boolean isSuccess() {
Object result = this.result; Object result = this.result;
if (result == null) { if (result == null || result == UNCANCELLABLE) {
return false; return false;
} }
return !(result instanceof CauseHolder); return !(result instanceof CauseHolder);
@ -85,9 +105,9 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
@Override @Override
public Throwable cause() { public Throwable cause() {
Object cause = result; Object result = this.result;
if (cause instanceof CauseHolder) { if (result instanceof CauseHolder) {
return ((CauseHolder) cause).cause; return ((CauseHolder) result).cause;
} }
return null; return null;
} }
@ -389,6 +409,47 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
return false; return false;
} }
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
Object result = this.result;
if (isDone0(result) || result == UNCANCELLABLE) {
return false;
}
synchronized (this) {
// Allow only once.
result = this.result;
if (isDone0(result) || result == UNCANCELLABLE) {
return false;
}
this.result = new CauseHolder(new CancellationException());
if (hasWaiters()) {
notifyAll();
}
}
return true;
}
@Override
public boolean setUncancellable() {
Object result = this.result;
if (isDone0(result)) {
return isCancelled0(result);
}
synchronized (this) {
// Allow only once.
result = this.result;
if (isDone0(result)) {
return isCancelled0(result);
}
this.result = UNCANCELLABLE;
}
return true;
}
private boolean setFailure0(Throwable cause) { private boolean setFailure0(Throwable cause) {
if (isDone()) { if (isDone()) {
return false; return false;

View File

@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
/** /**
* The result of an asynchronous operation. * The result of an asynchronous operation.
*/ */
@SuppressWarnings("ClassNameSameAsAncestorName")
public interface Future<V> extends java.util.concurrent.Future<V> { public interface Future<V> extends java.util.concurrent.Future<V> {
/** /**
@ -30,6 +31,11 @@ public interface Future<V> extends java.util.concurrent.Future<V> {
*/ */
boolean isSuccess(); boolean isSuccess();
/**
* returns {@code true} if and only if the operation can be cancelled via {@link #cancel(boolean)}.
*/
boolean isCancellable();
/** /**
* Returns the cause of the failed I/O operation if the I/O operation has * Returns the cause of the failed I/O operation if the I/O operation has
* failed. * failed.

View File

@ -56,6 +56,14 @@ public interface Promise<V> extends Future<V> {
*/ */
boolean tryFailure(Throwable cause); boolean tryFailure(Throwable cause);
/**
* Make this future impossible to cancel.
*
* @return {@code true} if and only if successfully marked this future as uncancellable or it is already done
* without being cancelled. {@code false} if this future has been cancelled already.
*/
boolean setUncancellable();
@Override @Override
Promise<V> addListener(GenericFutureListener<? extends Future<V>> listener); Promise<V> addListener(GenericFutureListener<? extends Future<V>> listener);

View File

@ -20,6 +20,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.RunnableFuture; import java.util.concurrent.RunnableFuture;
class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> { class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> {
protected final Callable<V> task; protected final Callable<V> task;
PromiseTask(EventExecutor executor, Runnable runnable, V result) { PromiseTask(EventExecutor executor, Runnable runnable, V result) {
@ -32,12 +33,12 @@ class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> {
} }
@Override @Override
public int hashCode() { public final int hashCode() {
return System.identityHashCode(this); return System.identityHashCode(this);
} }
@Override @Override
public boolean equals(Object obj) { public final boolean equals(Object obj) {
return this == obj; return this == obj;
} }
@ -52,7 +53,7 @@ class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> {
} }
@Override @Override
public Promise<V> setFailure(Throwable cause) { public final Promise<V> setFailure(Throwable cause) {
throw new IllegalStateException(); throw new IllegalStateException();
} }
@ -62,7 +63,7 @@ class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> {
} }
@Override @Override
public boolean tryFailure(Throwable cause) { public final boolean tryFailure(Throwable cause) {
return false; return false;
} }
@ -71,7 +72,7 @@ class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> {
} }
@Override @Override
public Promise<V> setSuccess(V result) { public final Promise<V> setSuccess(V result) {
throw new IllegalStateException(); throw new IllegalStateException();
} }
@ -81,11 +82,20 @@ class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> {
} }
@Override @Override
public boolean trySuccess(V result) { public final boolean trySuccess(V result) {
return false; return false;
} }
protected final boolean trySuccessInternal(V result) { protected final boolean trySuccessInternal(V result) {
return super.trySuccess(result); return super.trySuccess(result);
} }
@Override
public final boolean setUncancellable() {
throw new IllegalStateException();
}
protected final boolean setUncancellableInternal() {
return super.setUncancellable();
}
} }

View File

@ -27,7 +27,6 @@ import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Delayed; import java.util.concurrent.Delayed;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -35,7 +34,6 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
/** /**
@ -829,18 +827,13 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
} }
} }
@SuppressWarnings("ComparableImplementedButEqualsNotOverridden")
private static final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> { private static final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<ScheduledFutureTask> uncancellableUpdater =
AtomicIntegerFieldUpdater.newUpdater(ScheduledFutureTask.class, "uncancellable");
private final long id = nextTaskId.getAndIncrement(); private final long id = nextTaskId.getAndIncrement();
private long deadlineNanos; private long deadlineNanos;
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */ /* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
private final long periodNanos; private final long periodNanos;
@SuppressWarnings("UnusedDeclaration")
private volatile int uncancellable;
ScheduledFutureTask(SingleThreadEventExecutor executor, Runnable runnable, V result, long nanoTime) { ScheduledFutureTask(SingleThreadEventExecutor executor, Runnable runnable, V result, long nanoTime) {
this(executor, Executors.callable(runnable, result), nanoTime); this(executor, Executors.callable(runnable, result), nanoTime);
@ -904,28 +897,18 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
} }
} }
@Override
public int hashCode() {
return super.hashCode();
}
@Override
public boolean equals(Object obj) {
return super.equals(obj);
}
@Override @Override
public void run() { public void run() {
assert executor().inEventLoop(); assert executor().inEventLoop();
try { try {
if (periodNanos == 0) { if (periodNanos == 0) {
if (setUncancellable()) { if (setUncancellableInternal()) {
V result = task.call(); V result = task.call();
setSuccessInternal(result); setSuccessInternal(result);
} }
} else { } else {
// check if is done as it may was cancelled // check if is done as it may was cancelled
if (!isDone()) { if (!isCancelled()) {
task.call(); task.call();
if (!executor().isShutdown()) { if (!executor().isShutdown()) {
long p = periodNanos; long p = periodNanos;
@ -934,7 +917,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
} else { } else {
deadlineNanos = nanoTime() - p; deadlineNanos = nanoTime() - p;
} }
if (!isDone()) { if (!isCancelled()) {
executor().delayedTaskQueue.add(this); executor().delayedTaskQueue.add(this);
} }
} }
@ -944,28 +927,6 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
setFailureInternal(cause); setFailureInternal(cause);
} }
} }
@Override
public boolean isCancelled() {
if (cause() instanceof CancellationException) {
return true;
}
return false;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (!isDone()) {
if (setUncancellable()) {
return tryFailureInternal(new CancellationException());
}
}
return false;
}
private boolean setUncancellable() {
return uncancellableUpdater.compareAndSet(this, 0, 1);
}
} }
private final class PurgeTask implements Runnable { private final class PurgeTask implements Runnable {

View File

@ -19,6 +19,7 @@ import io.netty.util.concurrent.AbstractFuture;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
final class VoidChannelPromise extends AbstractFuture<Void> implements ChannelPromise { final class VoidChannelPromise extends AbstractFuture<Void> implements ChannelPromise {
@ -116,6 +117,21 @@ final class VoidChannelPromise extends AbstractFuture<Void> implements ChannelPr
return false; return false;
} }
@Override
public boolean setUncancellable() {
return true;
}
@Override
public boolean isCancellable() {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override @Override
public Throwable cause() { public Throwable cause() {
return null; return null;
@ -153,6 +169,14 @@ final class VoidChannelPromise extends AbstractFuture<Void> implements ChannelPr
return false; return false;
} }
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (fireException) {
channel.pipeline().fireExceptionCaught(new CancellationException());
}
return false;
}
@Override @Override
public boolean trySuccess() { public boolean trySuccess() {
return false; return false;