From 41af9a1eb3f9e5c814cc668736a006ee2593b325 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Tue, 11 Jun 2013 17:46:21 +0900 Subject: [PATCH] Implement cancellation properly for Promise/Future - Related issue: #1432 - Add Future.isCancellable() - Add Promise.setUncancellable() which is meant to be used for the party that runs the task uncancellable once started - Implement Future.isCancelled() and Promise.cancel(boolean) properly --- .../netty/util/concurrent/AbstractFuture.java | 10 --- .../netty/util/concurrent/CompleteFuture.java | 15 ++++ .../netty/util/concurrent/DefaultPromise.java | 71 +++++++++++++++++-- .../java/io/netty/util/concurrent/Future.java | 6 ++ .../io/netty/util/concurrent/Promise.java | 8 +++ .../io/netty/util/concurrent/PromiseTask.java | 22 ++++-- .../concurrent/SingleThreadEventExecutor.java | 47 ++---------- .../io/netty/channel/VoidChannelPromise.java | 24 +++++++ 8 files changed, 139 insertions(+), 64 deletions(-) diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractFuture.java b/common/src/main/java/io/netty/util/concurrent/AbstractFuture.java index 9ee3831a1d..b6073b69cd 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractFuture.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractFuture.java @@ -26,16 +26,6 @@ import java.util.concurrent.TimeoutException; */ public abstract class AbstractFuture implements Future { - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return false; - } - - @Override - public boolean isCancelled() { - return false; - } - @Override public V get() throws InterruptedException, ExecutionException { await(); diff --git a/common/src/main/java/io/netty/util/concurrent/CompleteFuture.java b/common/src/main/java/io/netty/util/concurrent/CompleteFuture.java index 97a9810bd8..a00f15abf7 100644 --- a/common/src/main/java/io/netty/util/concurrent/CompleteFuture.java +++ b/common/src/main/java/io/netty/util/concurrent/CompleteFuture.java @@ -126,4 +126,19 @@ public abstract class CompleteFuture extends AbstractFuture { public boolean isDone() { return true; } + + @Override + public boolean isCancellable() { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } } diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java index 134d468972..baa51c5868 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java @@ -20,6 +20,7 @@ import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; +import java.util.concurrent.CancellationException; import java.util.concurrent.TimeUnit; import static java.util.concurrent.TimeUnit.*; @@ -38,6 +39,7 @@ public class DefaultPromise extends AbstractFuture implements Promise { } }; private static final Signal SUCCESS = new Signal(DefaultPromise.class.getName() + ".SUCCESS"); + private static final Signal UNCANCELLABLE = new Signal(DefaultPromise.class.getName() + ".UNCANCELLABLE"); private final EventExecutor executor; private volatile Object result; @@ -69,15 +71,33 @@ public class DefaultPromise extends AbstractFuture implements Promise { return executor; } + @Override + public boolean isCancelled() { + return isCancelled0(result); + } + + private static boolean isCancelled0(Object result) { + return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException; + } + + @Override + public boolean isCancellable() { + return result == null; + } + @Override public boolean isDone() { - return result != null; + return isDone0(result); + } + + private static boolean isDone0(Object result) { + return result != null && result != UNCANCELLABLE; } @Override public boolean isSuccess() { Object result = this.result; - if (result == null) { + if (result == null || result == UNCANCELLABLE) { return false; } return !(result instanceof CauseHolder); @@ -85,9 +105,9 @@ public class DefaultPromise extends AbstractFuture implements Promise { @Override public Throwable cause() { - Object cause = result; - if (cause instanceof CauseHolder) { - return ((CauseHolder) cause).cause; + Object result = this.result; + if (result instanceof CauseHolder) { + return ((CauseHolder) result).cause; } return null; } @@ -389,6 +409,47 @@ public class DefaultPromise extends AbstractFuture implements Promise { return false; } + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + Object result = this.result; + if (isDone0(result) || result == UNCANCELLABLE) { + return false; + } + + synchronized (this) { + // Allow only once. + result = this.result; + if (isDone0(result) || result == UNCANCELLABLE) { + return false; + } + + this.result = new CauseHolder(new CancellationException()); + if (hasWaiters()) { + notifyAll(); + } + } + return true; + } + + @Override + public boolean setUncancellable() { + Object result = this.result; + if (isDone0(result)) { + return isCancelled0(result); + } + + synchronized (this) { + // Allow only once. + result = this.result; + if (isDone0(result)) { + return isCancelled0(result); + } + + this.result = UNCANCELLABLE; + } + return true; + } + private boolean setFailure0(Throwable cause) { if (isDone()) { return false; diff --git a/common/src/main/java/io/netty/util/concurrent/Future.java b/common/src/main/java/io/netty/util/concurrent/Future.java index f6d1c624fd..1ee845ff31 100644 --- a/common/src/main/java/io/netty/util/concurrent/Future.java +++ b/common/src/main/java/io/netty/util/concurrent/Future.java @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit; /** * The result of an asynchronous operation. */ +@SuppressWarnings("ClassNameSameAsAncestorName") public interface Future extends java.util.concurrent.Future { /** @@ -30,6 +31,11 @@ public interface Future extends java.util.concurrent.Future { */ boolean isSuccess(); + /** + * returns {@code true} if and only if the operation can be cancelled via {@link #cancel(boolean)}. + */ + boolean isCancellable(); + /** * Returns the cause of the failed I/O operation if the I/O operation has * failed. diff --git a/common/src/main/java/io/netty/util/concurrent/Promise.java b/common/src/main/java/io/netty/util/concurrent/Promise.java index 3eda72734f..e8360e17be 100644 --- a/common/src/main/java/io/netty/util/concurrent/Promise.java +++ b/common/src/main/java/io/netty/util/concurrent/Promise.java @@ -56,6 +56,14 @@ public interface Promise extends Future { */ boolean tryFailure(Throwable cause); + /** + * Make this future impossible to cancel. + * + * @return {@code true} if and only if successfully marked this future as uncancellable or it is already done + * without being cancelled. {@code false} if this future has been cancelled already. + */ + boolean setUncancellable(); + @Override Promise addListener(GenericFutureListener> listener); diff --git a/common/src/main/java/io/netty/util/concurrent/PromiseTask.java b/common/src/main/java/io/netty/util/concurrent/PromiseTask.java index 0f2d8ac752..ccb8bf1b20 100644 --- a/common/src/main/java/io/netty/util/concurrent/PromiseTask.java +++ b/common/src/main/java/io/netty/util/concurrent/PromiseTask.java @@ -20,6 +20,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.RunnableFuture; class PromiseTask extends DefaultPromise implements RunnableFuture { + protected final Callable task; PromiseTask(EventExecutor executor, Runnable runnable, V result) { @@ -32,12 +33,12 @@ class PromiseTask extends DefaultPromise implements RunnableFuture { } @Override - public int hashCode() { + public final int hashCode() { return System.identityHashCode(this); } @Override - public boolean equals(Object obj) { + public final boolean equals(Object obj) { return this == obj; } @@ -52,7 +53,7 @@ class PromiseTask extends DefaultPromise implements RunnableFuture { } @Override - public Promise setFailure(Throwable cause) { + public final Promise setFailure(Throwable cause) { throw new IllegalStateException(); } @@ -62,7 +63,7 @@ class PromiseTask extends DefaultPromise implements RunnableFuture { } @Override - public boolean tryFailure(Throwable cause) { + public final boolean tryFailure(Throwable cause) { return false; } @@ -71,7 +72,7 @@ class PromiseTask extends DefaultPromise implements RunnableFuture { } @Override - public Promise setSuccess(V result) { + public final Promise setSuccess(V result) { throw new IllegalStateException(); } @@ -81,11 +82,20 @@ class PromiseTask extends DefaultPromise implements RunnableFuture { } @Override - public boolean trySuccess(V result) { + public final boolean trySuccess(V result) { return false; } protected final boolean trySuccessInternal(V result) { return super.trySuccess(result); } + + @Override + public final boolean setUncancellable() { + throw new IllegalStateException(); + } + + protected final boolean setUncancellableInternal() { + return super.setUncancellable(); + } } diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index 403aefa5d9..8cd62e0513 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -27,7 +27,6 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; import java.util.concurrent.Delayed; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -35,7 +34,6 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; /** @@ -829,18 +827,13 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } } + @SuppressWarnings("ComparableImplementedButEqualsNotOverridden") private static final class ScheduledFutureTask extends PromiseTask implements ScheduledFuture { - @SuppressWarnings("rawtypes") - private static final AtomicIntegerFieldUpdater uncancellableUpdater = - AtomicIntegerFieldUpdater.newUpdater(ScheduledFutureTask.class, "uncancellable"); - private final long id = nextTaskId.getAndIncrement(); private long deadlineNanos; /* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */ private final long periodNanos; - @SuppressWarnings("UnusedDeclaration") - private volatile int uncancellable; ScheduledFutureTask(SingleThreadEventExecutor executor, Runnable runnable, V result, long nanoTime) { this(executor, Executors.callable(runnable, result), nanoTime); @@ -904,28 +897,18 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } } - @Override - public int hashCode() { - return super.hashCode(); - } - - @Override - public boolean equals(Object obj) { - return super.equals(obj); - } - @Override public void run() { assert executor().inEventLoop(); try { if (periodNanos == 0) { - if (setUncancellable()) { + if (setUncancellableInternal()) { V result = task.call(); setSuccessInternal(result); } } else { // check if is done as it may was cancelled - if (!isDone()) { + if (!isCancelled()) { task.call(); if (!executor().isShutdown()) { long p = periodNanos; @@ -934,7 +917,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } else { deadlineNanos = nanoTime() - p; } - if (!isDone()) { + if (!isCancelled()) { executor().delayedTaskQueue.add(this); } } @@ -944,28 +927,6 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { setFailureInternal(cause); } } - - @Override - public boolean isCancelled() { - if (cause() instanceof CancellationException) { - return true; - } - return false; - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - if (!isDone()) { - if (setUncancellable()) { - return tryFailureInternal(new CancellationException()); - } - } - return false; - } - - private boolean setUncancellable() { - return uncancellableUpdater.compareAndSet(this, 0, 1); - } } private final class PurgeTask implements Runnable { diff --git a/transport/src/main/java/io/netty/channel/VoidChannelPromise.java b/transport/src/main/java/io/netty/channel/VoidChannelPromise.java index 137ac47a81..0250f2e729 100644 --- a/transport/src/main/java/io/netty/channel/VoidChannelPromise.java +++ b/transport/src/main/java/io/netty/channel/VoidChannelPromise.java @@ -19,6 +19,7 @@ import io.netty.util.concurrent.AbstractFuture; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; +import java.util.concurrent.CancellationException; import java.util.concurrent.TimeUnit; final class VoidChannelPromise extends AbstractFuture implements ChannelPromise { @@ -116,6 +117,21 @@ final class VoidChannelPromise extends AbstractFuture implements ChannelPr return false; } + @Override + public boolean setUncancellable() { + return true; + } + + @Override + public boolean isCancellable() { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + @Override public Throwable cause() { return null; @@ -153,6 +169,14 @@ final class VoidChannelPromise extends AbstractFuture implements ChannelPr return false; } + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (fireException) { + channel.pipeline().fireExceptionCaught(new CancellationException()); + } + return false; + } + @Override public boolean trySuccess() { return false;