Port ConcurrentHashMapV8 again to get the recent upstream fixes

This commit is contained in:
Trustin Lee 2013-07-07 12:22:59 +09:00
parent da5a5af520
commit 378626b31f
7 changed files with 5040 additions and 5736 deletions

View File

@ -22,20 +22,23 @@
package io.netty.util.internal.chmv8; package io.netty.util.internal.chmv8;
import java.util.concurrent.RecursiveAction;
/** /**
* A {@link ForkJoinTask} with a completion action performed when * A {@link ForkJoinTask} with a completion action performed when
* triggered and there are no remaining pending * triggered and there are no remaining pending actions.
* actions. CountedCompleters are in general more robust in the * CountedCompleters are in general more robust in the
* presence of subtask stalls and blockage than are other forms of * presence of subtask stalls and blockage than are other forms of
* ForkJoinTasks, but are less intuitive to program. Uses of * ForkJoinTasks, but are less intuitive to program. Uses of
* CountedCompleter are similar to those of other completion based * CountedCompleter are similar to those of other completion based
* components (such as {@link java.nio.channels.CompletionHandler}) * components (such as {@link java.nio.channels.CompletionHandler})
* except that multiple <em>pending</em> completions may be necessary * except that multiple <em>pending</em> completions may be necessary
* to trigger the {@link #onCompletion} action, not just one. Unless * to trigger the completion action {@link #onCompletion(CountedCompleter)},
* initialized otherwise, the {@link #getPendingCount pending count} * not just one.
* starts at zero, but may be (atomically) changed using methods * Unless initialized otherwise, the {@linkplain #getPendingCount pending
* {@link #setPendingCount}, {@link #addToPendingCount}, and {@link * count} starts at zero, but may be (atomically) changed using
* #compareAndSetPendingCount}. Upon invocation of {@link * methods {@link #setPendingCount}, {@link #addToPendingCount}, and
* {@link #compareAndSetPendingCount}. Upon invocation of {@link
* #tryComplete}, if the pending action count is nonzero, it is * #tryComplete}, if the pending action count is nonzero, it is
* decremented; otherwise, the completion action is performed, and if * decremented; otherwise, the completion action is performed, and if
* this completer itself has a completer, the process is continued * this completer itself has a completer, the process is continued
@ -56,9 +59,10 @@ package io.netty.util.internal.chmv8;
* <p>A concrete CountedCompleter class must define method {@link * <p>A concrete CountedCompleter class must define method {@link
* #compute}, that should in most cases (as illustrated below), invoke * #compute}, that should in most cases (as illustrated below), invoke
* {@code tryComplete()} once before returning. The class may also * {@code tryComplete()} once before returning. The class may also
* optionally override method {@link #onCompletion} to perform an * optionally override method {@link #onCompletion(CountedCompleter)}
* action upon normal completion, and method {@link * to perform an action upon normal completion, and method
* #onExceptionalCompletion} to perform an action upon any exception. * {@link #onExceptionalCompletion(Throwable, CountedCompleter)} to
* perform an action upon any exception.
* *
* <p>CountedCompleters most often do not bear results, in which case * <p>CountedCompleters most often do not bear results, in which case
* they are normally declared as {@code CountedCompleter<Void>}, and * they are normally declared as {@code CountedCompleter<Void>}, and
@ -79,13 +83,14 @@ package io.netty.util.internal.chmv8;
* only as an internal helper for other computations, so its own task * only as an internal helper for other computations, so its own task
* status (as reported in methods such as {@link ForkJoinTask#isDone}) * status (as reported in methods such as {@link ForkJoinTask#isDone})
* is arbitrary; this status changes only upon explicit invocations of * is arbitrary; this status changes only upon explicit invocations of
* {@link #complete}, {@link ForkJoinTask#cancel}, {@link * {@link #complete}, {@link ForkJoinTask#cancel},
* ForkJoinTask#completeExceptionally} or upon exceptional completion * {@link ForkJoinTask#completeExceptionally(Throwable)} or upon
* of method {@code compute}. Upon any exceptional completion, the * exceptional completion of method {@code compute}. Upon any
* exception may be relayed to a task's completer (and its completer, * exceptional completion, the exception may be relayed to a task's
* and so on), if one exists and it has not otherwise already * completer (and its completer, and so on), if one exists and it has
* completed. Similarly, cancelling an internal CountedCompleter has * not otherwise already completed. Similarly, cancelling an internal
* only a local effect on that completer, so is not often useful. * CountedCompleter has only a local effect on that completer, so is
* not often useful.
* *
* <p><b>Sample Usages.</b> * <p><b>Sample Usages.</b>
* *
@ -112,8 +117,8 @@ package io.netty.util.internal.chmv8;
* improve load balancing. In the recursive case, the second of each * improve load balancing. In the recursive case, the second of each
* pair of subtasks to finish triggers completion of its parent * pair of subtasks to finish triggers completion of its parent
* (because no result combination is performed, the default no-op * (because no result combination is performed, the default no-op
* implementation of method {@code onCompletion} is not overridden). A * implementation of method {@code onCompletion} is not overridden).
* static utility method sets up the base task and invokes it * A static utility method sets up the base task and invokes it
* (here, implicitly using the {@link ForkJoinPool#commonPool()}). * (here, implicitly using the {@link ForkJoinPool#commonPool()}).
* *
* <pre> {@code * <pre> {@code
@ -168,12 +173,11 @@ package io.netty.util.internal.chmv8;
* } * }
* }</pre> * }</pre>
* *
* As a further improvement, notice that the left task need not even * As a further improvement, notice that the left task need not even exist.
* exist. Instead of creating a new one, we can iterate using the * Instead of creating a new one, we can iterate using the original task,
* original task, and add a pending count for each fork. Additionally, * and add a pending count for each fork. Additionally, because no task
* because no task in this tree implements an {@link #onCompletion} * in this tree implements an {@link #onCompletion(CountedCompleter)} method,
* method, {@code tryComplete()} can be replaced with {@link * {@code tryComplete()} can be replaced with {@link #propagateCompletion}.
* #propagateCompletion}.
* *
* <pre> {@code * <pre> {@code
* class ForEach<E> ... * class ForEach<E> ...
@ -251,7 +255,7 @@ package io.netty.util.internal.chmv8;
* *
* <p><b>Recording subtasks.</b> CountedCompleter tasks that combine * <p><b>Recording subtasks.</b> CountedCompleter tasks that combine
* results of multiple subtasks usually need to access these results * results of multiple subtasks usually need to access these results
* in method {@link #onCompletion}. As illustrated in the following * in method {@link #onCompletion(CountedCompleter)}. As illustrated in the following
* class (that performs a simplified form of map-reduce where mappings * class (that performs a simplified form of map-reduce where mappings
* and reductions are all of type {@code E}), one way to do this in * and reductions are all of type {@code E}), one way to do this in
* divide and conquer designs is to have each subtask record its * divide and conquer designs is to have each subtask record its
@ -352,7 +356,7 @@ package io.netty.util.internal.chmv8;
* while (h - l >= 2) { * while (h - l >= 2) {
* int mid = (l + h) >>> 1; * int mid = (l + h) >>> 1;
* addToPendingCount(1); * addToPendingCount(1);
* (forks = new MapReducer(this, array, mapper, reducer, mid, h, forks)).fork; * (forks = new MapReducer(this, array, mapper, reducer, mid, h, forks)).fork();
* h = mid; * h = mid;
* } * }
* if (h > l) * if (h > l)
@ -373,7 +377,7 @@ package io.netty.util.internal.chmv8;
* *
* <p><b>Triggers.</b> Some CountedCompleters are themselves never * <p><b>Triggers.</b> Some CountedCompleters are themselves never
* forked, but instead serve as bits of plumbing in other designs; * forked, but instead serve as bits of plumbing in other designs;
* including those in which the completion of one of more async tasks * including those in which the completion of one or more async tasks
* triggers another async task. For example: * triggers another async task. For example:
* *
* <pre> {@code * <pre> {@code
@ -394,7 +398,7 @@ package io.netty.util.internal.chmv8;
* @author Doug Lea * @author Doug Lea
*/ */
@SuppressWarnings("all") @SuppressWarnings("all")
abstract class CountedCompleter<T> extends ForkJoinTask<T> { public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
private static final long serialVersionUID = 5232453752276485070L; private static final long serialVersionUID = 5232453752276485070L;
/** This task's completer, or null if none */ /** This task's completer, or null if none */
@ -454,20 +458,21 @@ abstract class CountedCompleter<T> extends ForkJoinTask<T> {
} }
/** /**
* Performs an action when method {@link #completeExceptionally} * Performs an action when method {@link
* is invoked or method {@link #compute} throws an exception, and * #completeExceptionally(Throwable)} is invoked or method {@link
* this task has not otherwise already completed normally. On * #compute} throws an exception, and this task has not already
* entry to this method, this task {@link * otherwise completed normally. On entry to this method, this task
* ForkJoinTask#isCompletedAbnormally}. The return value of this * {@link ForkJoinTask#isCompletedAbnormally}. The return value
* method controls further propagation: If {@code true} and this * of this method controls further propagation: If {@code true}
* task has a completer, then this completer is also completed * and this task has a completer that has not completed, then that
* exceptionally. The default implementation of this method does * completer is also completed exceptionally, with the same
* nothing except return {@code true}. * exception as this completer. The default implementation of
* this method does nothing except return {@code true}.
* *
* @param ex the exception * @param ex the exception
* @param caller the task invoking this method (which may * @param caller the task invoking this method (which may
* be this task itself) * be this task itself)
* @return true if this exception should be propagated to this * @return {@code true} if this exception should be propagated to this
* task's completer, if one exists * task's completer, if one exists
*/ */
public boolean onExceptionalCompletion(Throwable ex, CountedCompleter<?> caller) { public boolean onExceptionalCompletion(Throwable ex, CountedCompleter<?> caller) {
@ -508,7 +513,7 @@ abstract class CountedCompleter<T> extends ForkJoinTask<T> {
* @param delta the value to add * @param delta the value to add
*/ */
public final void addToPendingCount(int delta) { public final void addToPendingCount(int delta) {
int c; // note: can replace with intrinsic in jdk8 int c;
do {} while (!U.compareAndSwapInt(this, PENDING, c = pending, c+delta)); do {} while (!U.compareAndSwapInt(this, PENDING, c = pending, c+delta));
} }
@ -518,7 +523,7 @@ abstract class CountedCompleter<T> extends ForkJoinTask<T> {
* *
* @param expected the expected value * @param expected the expected value
* @param count the new value * @param count the new value
* @return true if successful * @return {@code true} if successful
*/ */
public final boolean compareAndSetPendingCount(int expected, int count) { public final boolean compareAndSetPendingCount(int expected, int count) {
return U.compareAndSwapInt(this, PENDING, expected, count); return U.compareAndSwapInt(this, PENDING, expected, count);
@ -552,9 +557,9 @@ abstract class CountedCompleter<T> extends ForkJoinTask<T> {
/** /**
* If the pending count is nonzero, decrements the count; * If the pending count is nonzero, decrements the count;
* otherwise invokes {@link #onCompletion} and then similarly * otherwise invokes {@link #onCompletion(CountedCompleter)}
* tries to complete this task's completer, if one exists, * and then similarly tries to complete this task's completer,
* else marks this task as complete. * if one exists, else marks this task as complete.
*/ */
public final void tryComplete() { public final void tryComplete() {
CountedCompleter<?> a = this, s = a; CountedCompleter<?> a = this, s = a;
@ -573,12 +578,12 @@ abstract class CountedCompleter<T> extends ForkJoinTask<T> {
/** /**
* Equivalent to {@link #tryComplete} but does not invoke {@link * Equivalent to {@link #tryComplete} but does not invoke {@link
* #onCompletion} along the completion path: If the pending count * #onCompletion(CountedCompleter)} along the completion path:
* is nonzero, decrements the count; otherwise, similarly tries to * If the pending count is nonzero, decrements the count;
* complete this task's completer, if one exists, else marks this * otherwise, similarly tries to complete this task's completer, if
* task as complete. This method may be useful in cases where * one exists, else marks this task as complete. This method may be
* {@code onCompletion} should not, or need not, be invoked for * useful in cases where {@code onCompletion} should not, or need
* each completer in a computation. * not, be invoked for each completer in a computation.
*/ */
public final void propagateCompletion() { public final void propagateCompletion() {
CountedCompleter<?> a = this, s = a; CountedCompleter<?> a = this, s = a;
@ -595,13 +600,15 @@ abstract class CountedCompleter<T> extends ForkJoinTask<T> {
} }
/** /**
* Regardless of pending count, invokes {@link #onCompletion}, * Regardless of pending count, invokes
* marks this task as complete and further triggers {@link * {@link #onCompletion(CountedCompleter)}, marks this task as
* #tryComplete} on this task's completer, if one exists. The * complete and further triggers {@link #tryComplete} on this
* given rawResult is used as an argument to {@link #setRawResult} * task's completer, if one exists. The given rawResult is
* before invoking {@link #onCompletion} or marking this task as * used as an argument to {@link #setRawResult} before invoking
* complete; its value is meaningful only for classes overriding * {@link #onCompletion(CountedCompleter)} or marking this task
* {@code setRawResult}. * as complete; its value is meaningful only for classes
* overriding {@code setRawResult}. This method does not modify
* the pending count.
* *
* <p>This method may be useful when forcing completion as soon as * <p>This method may be useful when forcing completion as soon as
* any one (versus all) of several subtask results are obtained. * any one (versus all) of several subtask results are obtained.
@ -641,8 +648,8 @@ abstract class CountedCompleter<T> extends ForkJoinTask<T> {
/** /**
* If this task does not have a completer, invokes {@link * If this task does not have a completer, invokes {@link
* ForkJoinTask#quietlyComplete} and returns {@code null}. Or, if * ForkJoinTask#quietlyComplete} and returns {@code null}. Or, if
* this task's pending count is non-zero, decrements its pending * the completer's pending count is non-zero, decrements that
* count and returns {@code null}. Otherwise, returns the * pending count and returns {@code null}. Otherwise, returns the
* completer. This method can be used as part of a completion * completer. This method can be used as part of a completion
* traversal loop for homogeneous task hierarchies: * traversal loop for homogeneous task hierarchies:
* *
@ -684,8 +691,9 @@ abstract class CountedCompleter<T> extends ForkJoinTask<T> {
void internalPropagateException(Throwable ex) { void internalPropagateException(Throwable ex) {
CountedCompleter<?> a = this, s = a; CountedCompleter<?> a = this, s = a;
while (a.onExceptionalCompletion(ex, s) && while (a.onExceptionalCompletion(ex, s) &&
(a = (s = a).completer) != null && a.status >= 0) (a = (s = a).completer) != null && a.status >= 0 &&
a.recordExceptionalCompletion(ex); a.recordExceptionalCompletion(ex) == EXCEPTIONAL)
;
} }
/** /**

View File

@ -33,6 +33,10 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RunnableFuture; import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -53,12 +57,12 @@ import java.util.concurrent.locks.ReentrantLock;
* subtasks. As indicated by the name of this class, many programs * subtasks. As indicated by the name of this class, many programs
* using {@code ForkJoinTask} employ only methods {@link #fork} and * using {@code ForkJoinTask} employ only methods {@link #fork} and
* {@link #join}, or derivatives such as {@link * {@link #join}, or derivatives such as {@link
* #invokeAll(io.netty.util.internal.chmv8.ForkJoinTask...) invokeAll}. However, this class also * #invokeAll(ForkJoinTask...) invokeAll}. However, this class also
* provides a number of other methods that can come into play in * provides a number of other methods that can come into play in
* advanced usages, as well as extension mechanics that allow support * advanced usages, as well as extension mechanics that allow support
* of new forms of fork/join processing. * of new forms of fork/join processing.
* *
* <p>A {@code ForkJoinTask} is a lightweight form of {@link java.util.concurrent.Future}. * <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}.
* The efficiency of {@code ForkJoinTask}s stems from a set of * The efficiency of {@code ForkJoinTask}s stems from a set of
* restrictions (that are only partially statically enforceable) * restrictions (that are only partially statically enforceable)
* reflecting their main use as computational tasks calculating pure * reflecting their main use as computational tasks calculating pure
@ -77,7 +81,7 @@ import java.util.concurrent.locks.ReentrantLock;
* thrown. However, computations may still encounter unchecked * thrown. However, computations may still encounter unchecked
* exceptions, that are rethrown to callers attempting to join * exceptions, that are rethrown to callers attempting to join
* them. These exceptions may additionally include {@link * them. These exceptions may additionally include {@link
* java.util.concurrent.RejectedExecutionException} stemming from internal resource * RejectedExecutionException} stemming from internal resource
* exhaustion, such as failure to allocate internal task * exhaustion, such as failure to allocate internal task
* queues. Rethrown exceptions behave in the same way as regular * queues. Rethrown exceptions behave in the same way as regular
* exceptions, but, when possible, contain stack traces (as displayed * exceptions, but, when possible, contain stack traces (as displayed
@ -101,7 +105,7 @@ import java.util.concurrent.locks.ReentrantLock;
* *
* <p>The primary method for awaiting completion and extracting * <p>The primary method for awaiting completion and extracting
* results of a task is {@link #join}, but there are several variants: * results of a task is {@link #join}, but there are several variants:
* The {@link java.util.concurrent.Future#get} methods support interruptible and/or timed * The {@link Future#get} methods support interruptible and/or timed
* waits for completion and report results using {@code Future} * waits for completion and report results using {@code Future}
* conventions. Method {@link #invoke} is semantically * conventions. Method {@link #invoke} is semantically
* equivalent to {@code fork(); join()} but always attempts to begin * equivalent to {@code fork(); join()} but always attempts to begin
@ -149,10 +153,9 @@ import java.util.concurrent.locks.ReentrantLock;
* (DAG). Otherwise, executions may encounter a form of deadlock as * (DAG). Otherwise, executions may encounter a form of deadlock as
* tasks cyclically wait for each other. However, this framework * tasks cyclically wait for each other. However, this framework
* supports other methods and techniques (for example the use of * supports other methods and techniques (for example the use of
* {@link java.util.concurrent.Phaser}, {@link #helpQuiesce}, and * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that
* {@link #complete}) that
* may be of use in constructing custom subclasses for problems that * may be of use in constructing custom subclasses for problems that
* are not statically structured as DAGs. To support such usages a * are not statically structured as DAGs. To support such usages, a
* ForkJoinTask may be atomically <em>tagged</em> with a {@code short} * ForkJoinTask may be atomically <em>tagged</em> with a {@code short}
* value using {@link #setForkJoinTaskTag} or {@link * value using {@link #setForkJoinTaskTag} or {@link
* #compareAndSetForkJoinTaskTag} and checked using {@link * #compareAndSetForkJoinTaskTag} and checked using {@link
@ -184,7 +187,7 @@ import java.util.concurrent.locks.ReentrantLock;
* overwhelm processing. * overwhelm processing.
* *
* <p>This class provides {@code adapt} methods for {@link Runnable} * <p>This class provides {@code adapt} methods for {@link Runnable}
* and {@link java.util.concurrent.Callable}, that may be of use when mixing execution of * and {@link Callable}, that may be of use when mixing execution of
* {@code ForkJoinTasks} with other kinds of tasks. When all tasks are * {@code ForkJoinTasks} with other kinds of tasks. When all tasks are
* of this form, consider using a pool constructed in <em>asyncMode</em>. * of this form, consider using a pool constructed in <em>asyncMode</em>.
* *
@ -197,7 +200,7 @@ import java.util.concurrent.locks.ReentrantLock;
* @author Doug Lea * @author Doug Lea
*/ */
@SuppressWarnings("all") @SuppressWarnings("all")
abstract class ForkJoinTask<V> implements Future<V>, Serializable { public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
/* /*
* See the internal documentation of class ForkJoinPool for a * See the internal documentation of class ForkJoinPool for a
@ -302,9 +305,17 @@ abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/ */
private int externalAwaitDone() { private int externalAwaitDone() {
int s; int s;
ForkJoinPool.externalHelpJoin(this); ForkJoinPool cp = ForkJoinPool.common;
if ((s = status) >= 0) {
if (cp != null) {
if (this instanceof CountedCompleter)
s = cp.externalHelpComplete((CountedCompleter<?>)this);
else if (cp.tryExternalUnpush(this))
s = doExec();
}
if (s >= 0 && (s = status) >= 0) {
boolean interrupted = false; boolean interrupted = false;
while ((s = status) >= 0) { do {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) { synchronized (this) {
if (status >= 0) { if (status >= 0) {
@ -318,9 +329,11 @@ abstract class ForkJoinTask<V> implements Future<V>, Serializable {
notifyAll(); notifyAll();
} }
} }
} } while ((s = status) >= 0);
if (interrupted) if (interrupted)
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
}
}
return s; return s;
} }
@ -329,9 +342,15 @@ abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/ */
private int externalInterruptibleAwaitDone() throws InterruptedException { private int externalInterruptibleAwaitDone() throws InterruptedException {
int s; int s;
ForkJoinPool cp = ForkJoinPool.common;
if (Thread.interrupted()) if (Thread.interrupted())
throw new InterruptedException(); throw new InterruptedException();
ForkJoinPool.externalHelpJoin(this); if ((s = status) >= 0 && cp != null) {
if (this instanceof CountedCompleter)
cp.externalHelpComplete((CountedCompleter<?>)this);
else if (cp.tryExternalUnpush(this))
doExec();
}
while ((s = status) >= 0) { while ((s = status) >= 0) {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) { synchronized (this) {
@ -617,15 +636,10 @@ abstract class ForkJoinTask<V> implements Future<V>, Serializable {
/** /**
* A version of "sneaky throw" to relay exceptions * A version of "sneaky throw" to relay exceptions
*/ */
static void rethrow(final Throwable ex) { static void rethrow(Throwable ex) {
if (ex != null) { if (ex != null)
if (ex instanceof Error)
throw (Error)ex;
if (ex instanceof RuntimeException)
throw (RuntimeException)ex;
ForkJoinTask.<RuntimeException>uncheckedThrow(ex); ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
} }
}
/** /**
* The sneaky part of sneaky throw, relying on generics * The sneaky part of sneaky throw, relying on generics
@ -634,7 +648,6 @@ abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/ */
@SuppressWarnings("unchecked") static <T extends Throwable> @SuppressWarnings("unchecked") static <T extends Throwable>
void uncheckedThrow(Throwable t) throws T { void uncheckedThrow(Throwable t) throws T {
if (t != null)
throw (T)t; // rely on vacuous cast throw (T)t; // rely on vacuous cast
} }
@ -846,7 +859,7 @@ abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* <p>This method is designed to be invoked by <em>other</em> * <p>This method is designed to be invoked by <em>other</em>
* tasks. To terminate the current task, you can just return or * tasks. To terminate the current task, you can just return or
* throw an unchecked exception from its computation method, or * throw an unchecked exception from its computation method, or
* invoke {@link #completeExceptionally}. * invoke {@link #completeExceptionally(Throwable)}.
* *
* @param mayInterruptIfRunning this value has no effect in the * @param mayInterruptIfRunning this value has no effect in the
* default implementation because interrupts are not used to * default implementation because interrupts are not used to
@ -960,8 +973,8 @@ abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* retrieves its result. * retrieves its result.
* *
* @return the computed result * @return the computed result
* @throws java.util.concurrent.CancellationException if the computation was cancelled * @throws CancellationException if the computation was cancelled
* @throws java.util.concurrent.ExecutionException if the computation threw an * @throws ExecutionException if the computation threw an
* exception * exception
* @throws InterruptedException if the current thread is not a * @throws InterruptedException if the current thread is not a
* member of a ForkJoinPool and was interrupted while waiting * member of a ForkJoinPool and was interrupted while waiting
@ -984,12 +997,12 @@ abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @param timeout the maximum time to wait * @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument * @param unit the time unit of the timeout argument
* @return the computed result * @return the computed result
* @throws java.util.concurrent.CancellationException if the computation was cancelled * @throws CancellationException if the computation was cancelled
* @throws java.util.concurrent.ExecutionException if the computation threw an * @throws ExecutionException if the computation threw an
* exception * exception
* @throws InterruptedException if the current thread is not a * @throws InterruptedException if the current thread is not a
* member of a ForkJoinPool and was interrupted while waiting * member of a ForkJoinPool and was interrupted while waiting
* @throws java.util.concurrent.TimeoutException if the wait timed out * @throws TimeoutException if the wait timed out
*/ */
public final V get(long timeout, TimeUnit unit) public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException { throws InterruptedException, ExecutionException, TimeoutException {
@ -998,6 +1011,7 @@ abstract class ForkJoinTask<V> implements Future<V>, Serializable {
// Messy in part because we measure in nanosecs, but wait in millisecs // Messy in part because we measure in nanosecs, but wait in millisecs
int s; long ms; int s; long ms;
long ns = unit.toNanos(timeout); long ns = unit.toNanos(timeout);
ForkJoinPool cp;
if ((s = status) >= 0 && ns > 0L) { if ((s = status) >= 0 && ns > 0L) {
long deadline = System.nanoTime() + ns; long deadline = System.nanoTime() + ns;
ForkJoinPool p = null; ForkJoinPool p = null;
@ -1009,8 +1023,12 @@ abstract class ForkJoinTask<V> implements Future<V>, Serializable {
w = wt.workQueue; w = wt.workQueue;
p.helpJoinOnce(w, this); // no retries on failure p.helpJoinOnce(w, this); // no retries on failure
} }
else else if ((cp = ForkJoinPool.common) != null) {
ForkJoinPool.externalHelpJoin(this); if (this instanceof CountedCompleter)
cp.externalHelpComplete((CountedCompleter<?>)this);
else if (cp.tryExternalUnpush(this))
doExec();
}
boolean canBlock = false; boolean canBlock = false;
boolean interrupted = false; boolean interrupted = false;
try { try {
@ -1018,7 +1036,7 @@ abstract class ForkJoinTask<V> implements Future<V>, Serializable {
if (w != null && w.qlock < 0) if (w != null && w.qlock < 0)
cancelIgnoringExceptions(this); cancelIgnoringExceptions(this);
else if (!canBlock) { else if (!canBlock) {
if (p == null || p.tryCompensate()) if (p == null || p.tryCompensate(p.ctl))
canBlock = true; canBlock = true;
} }
else { else {
@ -1159,7 +1177,7 @@ abstract class ForkJoinTask<V> implements Future<V>, Serializable {
Thread t; Thread t;
return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) : ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) :
ForkJoinPool.tryExternalUnpush(this)); ForkJoinPool.common.tryExternalUnpush(this));
} }
/** /**
@ -1328,7 +1346,7 @@ abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* *
* @param e the expected tag value * @param e the expected tag value
* @param tag the new tag value * @param tag the new tag value
* @return true if successful; i.e., the current value was * @return {@code true} if successful; i.e., the current value was
* equal to e and is now tag. * equal to e and is now tag.
* @since 1.8 * @since 1.8
*/ */
@ -1380,6 +1398,24 @@ abstract class ForkJoinTask<V> implements Future<V>, Serializable {
private static final long serialVersionUID = 5232453952276885070L; private static final long serialVersionUID = 5232453952276885070L;
} }
/**
* Adaptor for Runnables in which failure forces worker exception
*/
static final class RunnableExecuteAction extends ForkJoinTask<Void> {
final Runnable runnable;
RunnableExecuteAction(Runnable runnable) {
if (runnable == null) throw new NullPointerException();
this.runnable = runnable;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) { }
public final boolean exec() { runnable.run(); return true; }
void internalPropagateException(Throwable ex) {
rethrow(ex); // rethrow outside exec() catches.
}
private static final long serialVersionUID = 5232453952276885070L;
}
/** /**
* Adaptor for Callables * Adaptor for Callables
*/ */

View File

@ -22,6 +22,7 @@
package io.netty.util.internal.chmv8; package io.netty.util.internal.chmv8;
/** /**
* A thread managed by a {@link ForkJoinPool}, which executes * A thread managed by a {@link ForkJoinPool}, which executes
* {@link ForkJoinTask}s. * {@link ForkJoinTask}s.
@ -30,14 +31,14 @@ package io.netty.util.internal.chmv8;
* scheduling or execution. However, you can override initialization * scheduling or execution. However, you can override initialization
* and termination methods surrounding the main task processing loop. * and termination methods surrounding the main task processing loop.
* If you do create such a subclass, you will also need to supply a * If you do create such a subclass, you will also need to supply a
* custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to
* in a {@code ForkJoinPool}. * {@linkplain ForkJoinPool#ForkJoinPool use it} in a {@code ForkJoinPool}.
* *
* @since 1.7 * @since 1.7
* @author Doug Lea * @author Doug Lea
*/ */
@SuppressWarnings("all") @SuppressWarnings("all")
final class ForkJoinWorkerThread extends Thread { public class ForkJoinWorkerThread extends Thread {
/* /*
* ForkJoinWorkerThreads are managed by ForkJoinPools and perform * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
* ForkJoinTasks. For explanation, see the internal documentation * ForkJoinTasks. For explanation, see the internal documentation
@ -77,16 +78,17 @@ final class ForkJoinWorkerThread extends Thread {
} }
/** /**
* Returns the index number of this thread in its pool. The * Returns the unique index number of this thread in its pool.
* returned value ranges from zero to the maximum number of * The returned value ranges from zero to the maximum number of
* threads (minus one) that have ever been created in the pool. * threads (minus one) that may exist in the pool, and does not
* This method may be useful for applications that track status or * change during the lifetime of the thread. This method may be
* collect results per-worker rather than per-task. * useful for applications that track status or collect results
* per-worker-thread rather than per-task.
* *
* @return the index number * @return the index number
*/ */
public int getPoolIndex() { public int getPoolIndex() {
return workQueue.poolIndex; return workQueue.poolIndex >>> 1; // ignore odd/even tag bit
} }
/** /**

View File

@ -21,7 +21,9 @@
*/ */
package io.netty.util.internal.chmv8; package io.netty.util.internal.chmv8;
import java.io.Serializable; import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
/** /**
* One or more variables that together maintain an initially zero * One or more variables that together maintain an initially zero
@ -31,7 +33,7 @@ import java.io.Serializable;
* #longValue}) returns the current total combined across the * #longValue}) returns the current total combined across the
* variables maintaining the sum. * variables maintaining the sum.
* *
* <p>This class is usually preferable to {@link java.util.concurrent.atomic.AtomicLong} when * <p>This class is usually preferable to {@link AtomicLong} when
* multiple threads update a common sum that is used for purposes such * multiple threads update a common sum that is used for purposes such
* as collecting statistics, not for fine-grained synchronization * as collecting statistics, not for fine-grained synchronization
* control. Under low update contention, the two classes have similar * control. Under low update contention, the two classes have similar
@ -51,7 +53,7 @@ import java.io.Serializable;
* @author Doug Lea * @author Doug Lea
*/ */
@SuppressWarnings("all") @SuppressWarnings("all")
final class LongAdder extends Striped64 implements Serializable { public class LongAdder extends Striped64 implements Serializable {
private static final long serialVersionUID = 7249069246863182397L; private static final long serialVersionUID = 7249069246863182397L;
/** /**

View File

@ -21,6 +21,7 @@
*/ */
package io.netty.util.internal.chmv8; package io.netty.util.internal.chmv8;
import java.util.Random; import java.util.Random;
/** /**