Introduce Future.toStage() which allows to obtain a CompletionStage a… (#9004)

Motivation:

CompletionStage is the new standard for async operation chaining in JDK8+ that is supported by various of libs. To make it easer to interopt with other libs and to allow users to make good use of lambdas and functional programming style we should allow to convert from our Future to a CompletionStage while still provide the same ordering guarantees.

The reason why we expose this as toStage() and not jus have Future extend CompletionStage is for two reasons:
 - Keep our interface norrow
 - Keep semantics clear (Future.addListener(...) methods return this while all chaining methods of CompletionStage return a new instance).

Modifications:

- Merge implements in AbstractFuture to Future (by make these default methods)
- Add Future.toStage() as a default method and a special implemention in DefaultPromise (to reduce GC).
- Add Future.executor() which returns the EventExecutor that is pinned to the Future
- Introduce FutureCompletionStage that extends CompletionStage to clarify threading semantics and guarantees.

Result:

Easier inter-op with other Java8+ libaries. Related to https://github.com/netty/netty/issues/8523.
This commit is contained in:
Norman Maurer 2019-04-11 14:52:33 +02:00 committed by GitHub
parent a57f73e1fe
commit 81244e1ae1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 2043 additions and 69 deletions

View File

@ -1,58 +0,0 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Abstract {@link Future} implementation which does not allow for cancellation.
*
* @param <V>
*/
public abstract class AbstractFuture<V> implements Future<V> {
@Override
public V get() throws InterruptedException, ExecutionException {
await();
Throwable cause = cause();
if (cause == null) {
return getNow();
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (await(timeout, unit)) {
Throwable cause = cause();
if (cause == null) {
return getNow();
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
throw new TimeoutException();
}
}

View File

@ -283,6 +283,11 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
this.future = future; this.future = future;
} }
@Override
public EventExecutor executor() {
return future.executor();
}
@Override @Override
public long deadlineNanos() { public long deadlineNanos() {
return future.deadlineNanos(); return future.deadlineNanos();

View File

@ -23,10 +23,14 @@ import java.util.concurrent.TimeUnit;
/** /**
* A skeletal {@link Future} implementation which represents a {@link Future} which has been completed already. * A skeletal {@link Future} implementation which represents a {@link Future} which has been completed already.
*/ */
public abstract class CompleteFuture<V> extends AbstractFuture<V> { public abstract class CompleteFuture<V> implements Future<V> {
private final EventExecutor executor; private final EventExecutor executor;
// It is fine to not make this volatile as even if we override the value in there it does not matter as
// DefaultFutureCompletionStage has no state itself and is just a wrapper around this CompletableFuture instance.
private DefaultFutureCompletionStage<V> stage;
/** /**
* Creates a new instance. * Creates a new instance.
* *
@ -39,7 +43,8 @@ public abstract class CompleteFuture<V> extends AbstractFuture<V> {
/** /**
* Return the {@link EventExecutor} which is used by this {@link CompleteFuture}. * Return the {@link EventExecutor} which is used by this {@link CompleteFuture}.
*/ */
protected EventExecutor executor() { @Override
public EventExecutor executor() {
return executor; return executor;
} }
@ -147,4 +152,13 @@ public abstract class CompleteFuture<V> extends AbstractFuture<V> {
public boolean cancel(boolean mayInterruptIfRunning) { public boolean cancel(boolean mayInterruptIfRunning) {
return false; return false;
} }
@Override
public FutureCompletionStage<V> asStage() {
DefaultFutureCompletionStage<V> stageAdapter = stage;
if (stageAdapter == null) {
stage = stageAdapter = new DefaultFutureCompletionStage<>(this);
}
return stageAdapter;
}
} }

View File

@ -0,0 +1,563 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import static java.util.Objects.requireNonNull;
/**
* Wraps a {@link io.netty.util.concurrent.Future} and provides a {@link FutureCompletionStage} implementation
* on top of it.
*
* @param <V> the value type.
*/
final class DefaultFutureCompletionStage<V> implements FutureCompletionStage<V> {
private enum Marker {
EMPTY,
ERROR
}
// Just a marker
private static final Executor SAME_AS_FUTURE = task -> {
throw new UnsupportedOperationException("Only a marker, should never been called!");
};
private final Future<V> future;
DefaultFutureCompletionStage(Future<V> future) {
this.future = future;
}
@Override
public Future<V> future() {
return future;
}
@Override
public <U> FutureCompletionStage<U> thenApply(Function<? super V, ? extends U> fn) {
return thenApplyAsync(fn, SAME_AS_FUTURE);
}
@Override
public <U> FutureCompletionStage<U> thenApplyAsync(Function<? super V, ? extends U> fn) {
return thenApplyAsync(fn, ForkJoinPool.commonPool());
}
@Override
public FutureCompletionStage<Void> thenAccept(Consumer<? super V> action) {
return thenAcceptAsync(action, SAME_AS_FUTURE);
}
@Override
public FutureCompletionStage<Void> thenAcceptAsync(Consumer<? super V> action) {
return thenAcceptAsync(action, ForkJoinPool.commonPool());
}
@Override
public FutureCompletionStage<Void> thenRun(Runnable action) {
return thenRunAsync(action, SAME_AS_FUTURE);
}
@Override
public FutureCompletionStage<Void> thenRunAsync(Runnable action) {
return thenRunAsync(action, ForkJoinPool.commonPool());
}
@Override
public <U, V1> FutureCompletionStage<V1> thenCombine(
CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn) {
return thenCombineAsync(other, fn, SAME_AS_FUTURE);
}
@Override
public <U, V1> FutureCompletionStage<V1> thenCombineAsync(
CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn) {
return thenCombineAsync(other, fn, ForkJoinPool.commonPool());
}
@Override
public <U> FutureCompletionStage<Void> thenAcceptBoth(
CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action) {
return thenAcceptBothAsync(other, action, SAME_AS_FUTURE);
}
@Override
public <U> FutureCompletionStage<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action) {
return thenAcceptBothAsync(other, action, ForkJoinPool.commonPool());
}
@Override
public FutureCompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action) {
return runAfterBothAsync(other, action, SAME_AS_FUTURE);
}
@Override
public FutureCompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action) {
return runAfterBothAsync(other, action, ForkJoinPool.commonPool());
}
@Override
public <U> FutureCompletionStage<U> applyToEither(
CompletionStage<? extends V> other, Function<? super V, U> fn) {
return applyToEitherAsync(other, fn, SAME_AS_FUTURE);
}
@Override
public <U> FutureCompletionStage<U> applyToEitherAsync(
CompletionStage<? extends V> other, Function<? super V, U> fn) {
return applyToEitherAsync(other, fn, ForkJoinPool.commonPool());
}
@Override
public FutureCompletionStage<Void> acceptEither(CompletionStage<? extends V> other, Consumer<? super V> action) {
return acceptEitherAsync(other, action, SAME_AS_FUTURE);
}
@Override
public FutureCompletionStage<Void> acceptEitherAsync(
CompletionStage<? extends V> other, Consumer<? super V> action) {
return acceptEitherAsync(other, action, ForkJoinPool.commonPool());
}
@Override
public FutureCompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action) {
return runAfterEitherAsync(other, action, SAME_AS_FUTURE);
}
@Override
public FutureCompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action) {
return runAfterEitherAsync(other, action, ForkJoinPool.commonPool());
}
@Override
public <U> FutureCompletionStage<U> thenCompose(Function<? super V, ? extends CompletionStage<U>> fn) {
return thenComposeAsync(fn, SAME_AS_FUTURE);
}
@Override
public <U> FutureCompletionStage<U> thenComposeAsync(Function<? super V, ? extends CompletionStage<U>> fn) {
return thenComposeAsync(fn, ForkJoinPool.commonPool());
}
@Override
public FutureCompletionStage<V> whenComplete(BiConsumer<? super V, ? super Throwable> action) {
return whenCompleteAsync(action, SAME_AS_FUTURE);
}
@Override
public FutureCompletionStage<V> whenCompleteAsync(BiConsumer<? super V, ? super Throwable> action) {
return whenCompleteAsync(action, ForkJoinPool.commonPool());
}
@Override
public <U> FutureCompletionStage<U> handle(BiFunction<? super V, Throwable, ? extends U> fn) {
return handleAsync(fn, SAME_AS_FUTURE);
}
@Override
public <U> FutureCompletionStage<U> handleAsync(BiFunction<? super V, Throwable, ? extends U> fn) {
return handleAsync(fn, ForkJoinPool.commonPool());
}
@Override
public <U> FutureCompletionStage<U> thenApplyAsync(Function<? super V, ? extends U> fn, Executor executor) {
requireNonNull(fn, "fn");
requireNonNull(executor, "executor");
Promise<U> promise = executor().newPromise();
future.addListener(future -> {
Throwable cause = future.cause();
if (cause == null) {
@SuppressWarnings("unchecked") V value = (V) future.getNow();
if (executeDirectly(executor)) {
thenApplyAsync0(promise, value, fn);
} else {
safeExecute(executor, () -> thenApplyAsync0(promise, value, fn), promise);
}
} else {
promise.setFailure(cause);
}
});
return promise.asStage();
}
private static <U, V> void thenApplyAsync0(Promise<U> promise, V value, Function<? super V, ? extends U> fn) {
final U result;
try {
result = fn.apply(value);
} catch (Throwable cause) {
promise.setFailure(cause);
return;
}
promise.setSuccess(result);
}
@Override
public FutureCompletionStage<Void> thenAcceptAsync(Consumer<? super V> action, Executor executor) {
requireNonNull(action, "action");
requireNonNull(executor, "executor");
Promise<Void> promise = executor().newPromise();
future.addListener(future -> {
Throwable cause = future.cause();
if (cause == null) {
@SuppressWarnings("unchecked") V value = (V) future.getNow();
if (executeDirectly(executor)) {
thenAcceptAsync0(promise, value, action);
} else {
safeExecute(executor, () -> thenAcceptAsync0(promise, value, action), promise);
}
} else {
promise.setFailure(cause);
}
});
return promise.asStage();
}
private static <U, V> void thenAcceptAsync0(Promise<U> promise, V value, Consumer<? super V> action) {
try {
action.accept(value);
promise.setSuccess(null);
} catch (Throwable cause) {
promise.setFailure(cause);
}
}
@Override
public FutureCompletionStage<Void> thenRunAsync(Runnable action, Executor executor) {
return thenAcceptAsync(ignore -> action.run(), executor);
}
@Override
public <U, V1> FutureCompletionStage<V1> thenCombineAsync(
CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn, Executor executor) {
requireNonNull(other, "other");
requireNonNull(fn, "fn");
requireNonNull(executor, "executor");
Promise<V1> promise = executor().newPromise();
AtomicReference<Object> reference = new AtomicReference<>(Marker.EMPTY);
abstract class CombineBiConsumer<T1, T2, T> implements BiConsumer<T, Throwable> {
@SuppressWarnings("unchecked")
@Override
public void accept(T v, Throwable error) {
if (error == null) {
if (!reference.compareAndSet(Marker.EMPTY, v)) {
Object rawValue = reference.get();
if (rawValue == Marker.ERROR) {
return;
}
applyAndNotify0(promise, (T1) v, (T2) rawValue, fn);
}
} else {
if (reference.getAndSet(Marker.ERROR) != Marker.ERROR) {
// Did not fail the promise before, do it now.
promise.setFailure(error);
}
}
}
abstract void applyAndNotify0(
Promise<V1> promise, T1 value1, T2 value2, BiFunction<? super V, ? super U, ? extends V1> fn);
}
whenCompleteAsync(new CombineBiConsumer<V, U, V>() {
@Override
void applyAndNotify0(
Promise<V1> promise, V value1, U value2, BiFunction<? super V, ? super U, ? extends V1> fn) {
applyAndNotify(promise, value1, value2, fn);
}
}, executor);
other.whenCompleteAsync(new CombineBiConsumer<U, V, U>() {
@Override
void applyAndNotify0(
Promise<V1> promise, U value1, V value2, BiFunction<? super V, ? super U, ? extends V1> fn) {
applyAndNotify(promise, value2, value1, fn);
}
}, otherExecutor(executor));
return promise.asStage();
}
private Executor otherExecutor(Executor executor) {
return executor == SAME_AS_FUTURE ? executor() : executor;
}
@Override
public <U> FutureCompletionStage<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action, Executor executor) {
requireNonNull(action, "action");
return thenCombineAsync(other, (value, value2) -> {
action.accept(value, value2);
return null;
}, executor);
}
@Override
public FutureCompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor) {
requireNonNull(action, "action");
return thenCombineAsync(other, (ignoreOtherValue, ignoreError) -> {
action.run();
return null;
}, executor);
}
@Override
public <U> FutureCompletionStage<U> applyToEitherAsync(
CompletionStage<? extends V> other, Function<? super V, U> fn, Executor executor) {
requireNonNull(other, "other");
requireNonNull(fn, "fn");
Promise<U> promise = executor().newPromise();
BiConsumer<V, Throwable> consumer = new AtomicBiConsumer<V, U>(promise) {
@Override
protected U apply(V value) {
return fn.apply(value);
}
};
whenCompleteAsync(consumer, executor);
other.whenCompleteAsync(consumer, otherExecutor(executor));
return promise.asStage();
}
@Override
public FutureCompletionStage<Void> acceptEitherAsync(
CompletionStage<? extends V> other, Consumer<? super V> action, Executor executor) {
requireNonNull(other, "other");
requireNonNull(action, "action");
Promise<Void> promise = executor().newPromise();
BiConsumer<V, Throwable> consumer = new AtomicBiConsumer<V, Void>(promise) {
@Override
protected Void apply(V value) {
action.accept(value);
return null;
}
};
whenCompleteAsync(consumer, executor);
other.whenCompleteAsync(consumer, otherExecutor(executor));
return promise.asStage();
}
@Override
public FutureCompletionStage<Void> runAfterEitherAsync(
CompletionStage<?> other, Runnable action, Executor executor) {
requireNonNull(other, "other");
requireNonNull(action, "action");
Promise<Void> promise = executor().newPromise();
BiConsumer<Object, Throwable> consumer = new AtomicBiConsumer<Object, Void>(promise) {
@Override
protected Void apply(Object value) {
action.run();
return null;
}
};
whenCompleteAsync(consumer, executor);
other.whenCompleteAsync(consumer, otherExecutor(executor));
return promise.asStage();
}
@Override
public <U> FutureCompletionStage<U> thenComposeAsync(
Function<? super V, ? extends CompletionStage<U>> fn, Executor executor) {
requireNonNull(fn, "fn");
requireNonNull(executor, "executor");
Promise<U> promise = executor().newPromise();
future.addListener(f -> {
Throwable cause = f.cause();
if (cause == null) {
@SuppressWarnings("unchecked") V value = (V) f.getNow();
if (executeDirectly(executor)) {
thenComposeAsync0(promise, fn, value);
} else {
safeExecute(executor, () -> thenComposeAsync0(promise, fn, value), promise);
}
} else {
promise.setFailure(cause);
}
});
return promise.asStage();
}
private static <V, U> void thenComposeAsync0(
Promise<U> promise, Function<? super V, ? extends CompletionStage<U>> fn, V value) {
final CompletionStage<U> result;
try {
result = fn.apply(value);
} catch (Throwable cause) {
promise.setFailure(cause);
return;
}
result.whenComplete((v, error) -> {
if (error == null) {
promise.setSuccess(v);
} else {
promise.setFailure(error);
}
});
}
@Override
public FutureCompletionStage<V> exceptionally(Function<Throwable, ? extends V> fn) {
requireNonNull(fn, "fn");
Promise<V> promise = executor().newPromise();
future.addListener(f -> {
Throwable error = f.cause();
if (error == null) {
@SuppressWarnings("unchecked") V value = (V) f.getNow();
promise.setSuccess(value);
} else {
final V result;
try {
result = fn.apply(error);
} catch (Throwable cause) {
promise.setFailure(cause);
return;
}
promise.setSuccess(result);
}
});
return promise.asStage();
}
@Override
public FutureCompletionStage<V> whenCompleteAsync(
BiConsumer<? super V, ? super Throwable> action, Executor executor) {
requireNonNull(action, "action");
requireNonNull(executor, "executor");
Promise<V> promise = executor().newPromise();
future.addListener(f -> {
if (executeDirectly(executor)) {
whenCompleteAsync0(promise, f, action);
} else {
safeExecute(executor, () -> whenCompleteAsync0(promise, f, action), promise);
}
});
return promise.asStage();
}
private static <V> void whenCompleteAsync0(
Promise<V> promise, Future<? super V> f, BiConsumer<? super V, ? super Throwable> action) {
Throwable cause = f.cause();
@SuppressWarnings("unchecked") V value = cause == null ? (V) f.getNow() : null;
try {
action.accept(value, cause);
} catch (Throwable error) {
promise.setFailure(error);
return;
}
if (cause == null) {
promise.setSuccess(value);
} else {
promise.setFailure(cause);
}
}
@Override
public <U> FutureCompletionStage<U> handleAsync(
BiFunction<? super V, Throwable, ? extends U> fn, Executor executor) {
requireNonNull(fn, "fn");
requireNonNull(executor, "executor");
Promise<U> promise = executor().newPromise();
future.addListener(f -> {
if (executeDirectly(executor)) {
handleAsync0(promise, f, fn);
} else {
safeExecute(executor, () -> handleAsync0(promise, f, fn), promise);
}
});
return promise.asStage();
}
@SuppressWarnings("unchecked")
private static <U, V> void handleAsync0(
Promise<U> promise, Future<? super V> f, BiFunction<? super V, Throwable, ? extends U> fn) {
Throwable cause = f.cause();
applyAndNotify(promise, cause == null ? (V) f.getNow() : null, cause, fn);
}
private static <U, V, T> void applyAndNotify(
Promise<U> promise, V value, T value2, BiFunction<? super V, ? super T, ? extends U> fn) {
final U result;
try {
result = fn.apply(value, value2);
} catch (Throwable error) {
promise.setFailure(error);
return;
}
promise.setSuccess(result);
}
private static boolean executeDirectly(Executor executor) {
return executor == SAME_AS_FUTURE;
}
private static void safeExecute(Executor executor, Runnable task, Promise<?> promise) {
try {
executor.execute(task);
} catch (Throwable cause) {
promise.setFailure(cause);
}
}
private abstract static class AtomicBiConsumer<V, U> extends AtomicReference<Object>
implements BiConsumer<V, Throwable> {
private final Promise<U> promise;
AtomicBiConsumer(Promise<U> promise) {
super(Marker.EMPTY);
this.promise = promise;
}
@Override
public void accept(V v, Throwable error) {
if (error == null) {
if (compareAndSet(Marker.EMPTY, v)) {
final U value;
try {
value = apply(v);
} catch (Throwable cause) {
promise.setFailure(cause);
return;
}
promise.setSuccess(value);
}
} else if (compareAndSet(Marker.EMPTY, Marker.ERROR)) {
promise.setFailure(error);
}
}
protected abstract U apply(V value);
}
}

View File

@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> { public class DefaultPromise<V> implements Promise<V> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);
private static final InternalLogger rejectedExecutionLogger = private static final InternalLogger rejectedExecutionLogger =
InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution"); InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
@ -47,6 +47,11 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
private volatile Object result; private volatile Object result;
private final EventExecutor executor; private final EventExecutor executor;
// It is fine to not make this volatile as even if we override the value in there it does not matter as
// DefaultFutureCompletionStage has no state itself and is just a wrapper around this DefaultPromise instance.
private DefaultFutureCompletionStage<V> stage;
/** /**
* One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}. * One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.
* If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified. * If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.
@ -73,6 +78,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
*/ */
public DefaultPromise(EventExecutor executor) { public DefaultPromise(EventExecutor executor) {
this.executor = requireNonNull(executor, "executor"); this.executor = requireNonNull(executor, "executor");
stage = new DefaultFutureCompletionStage<>(this);
} }
@Override @Override
@ -363,7 +369,8 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
* depth exceeds a threshold. * depth exceeds a threshold.
* @return The executor used to notify listeners when this promise is complete. * @return The executor used to notify listeners when this promise is complete.
*/ */
protected final EventExecutor executor() { @Override
public final EventExecutor executor() {
return executor; return executor;
} }
@ -736,4 +743,13 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t); rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
} }
} }
@Override
public FutureCompletionStage<V> asStage() {
DefaultFutureCompletionStage<V> stageAdapter = stage;
if (stageAdapter == null) {
stage = stageAdapter = new DefaultFutureCompletionStage<>(this);
}
return stageAdapter;
}
} }

View File

@ -16,8 +16,9 @@
package io.netty.util.concurrent; package io.netty.util.concurrent;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/** /**
* The result of an asynchronous operation. * The result of an asynchronous operation.
@ -174,4 +175,46 @@ public interface Future<V> extends java.util.concurrent.Future<V> {
*/ */
@Override @Override
boolean cancel(boolean mayInterruptIfRunning); boolean cancel(boolean mayInterruptIfRunning);
/**
* Returns the {@link EventExecutor} that is tied to this {@link Future}.
*/
EventExecutor executor();
@Override
default V get() throws InterruptedException, ExecutionException {
await();
Throwable cause = cause();
if (cause == null) {
return getNow();
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
@Override
default V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (await(timeout, unit)) {
Throwable cause = cause();
if (cause == null) {
return getNow();
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
throw new TimeoutException();
}
/**
* Returns a {@link FutureCompletionStage} that reflects the state of this {@link Future} and so will receive
* all updates as well.
*/
default FutureCompletionStage<V> asStage() {
return new DefaultFutureCompletionStage<>(this);
}
} }

View File

@ -0,0 +1,213 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import io.netty.util.internal.StringUtil;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* A {@link CompletionStage} that provides the same threading semantics and guarantees as the underlying
* {@link io.netty.util.concurrent.Future}, which means that all the callbacks will be executed by {@link #executor()}
* if not specified otherwise (by calling the corresponding *Async methods).
*
* Please be aware that {@link FutureCompletionStage#toCompletableFuture()} is not supported and so will throw
* a {@link UnsupportedOperationException} when invoked.
*
* @param <V> the value type.
*/
public interface FutureCompletionStage<V> extends CompletionStage<V> {
/**
* Returns the underlying {@link Future} of this {@link FutureCompletionStage}.
*/
Future<V> future();
/**
* See {@link Future#executor()}.
*/
default EventExecutor executor() {
return future().executor();
}
/**
* Not supported and so throws an {@link UnsupportedOperationException}.
*/
@Override
default CompletableFuture<V> toCompletableFuture() {
throw new UnsupportedOperationException("Not supported by "
+ StringUtil.simpleClassName(FutureCompletionStage.class));
}
@Override
<U> FutureCompletionStage<U> thenApply(Function<? super V, ? extends U> fn);
@Override
<U> FutureCompletionStage<U> thenApplyAsync(Function<? super V, ? extends U> fn);
@Override
FutureCompletionStage<Void> thenAccept(Consumer<? super V> action);
@Override
FutureCompletionStage<Void> thenAcceptAsync(Consumer<? super V> action);
@Override
FutureCompletionStage<Void> thenRun(Runnable action);
@Override
FutureCompletionStage<Void> thenRunAsync(Runnable action);
@Override
<U, V1> FutureCompletionStage<V1> thenCombine(
CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn);
@Override
<U, V1> FutureCompletionStage<V1> thenCombineAsync(
CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn);
@Override
<U> FutureCompletionStage<Void> thenAcceptBoth(
CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action);
@Override
<U> FutureCompletionStage<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action);
@Override
FutureCompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
@Override
FutureCompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action);
@Override
<U> FutureCompletionStage<U> applyToEither(CompletionStage<? extends V> other, Function<? super V, U> fn);
@Override
<U> FutureCompletionStage<U> applyToEitherAsync(CompletionStage<? extends V> other, Function<? super V, U> fn);
@Override
FutureCompletionStage<Void> acceptEither(CompletionStage<? extends V> other, Consumer<? super V> action);
@Override
FutureCompletionStage<Void> acceptEitherAsync(CompletionStage<? extends V> other, Consumer<? super V> action);
@Override
FutureCompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action);
@Override
FutureCompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action);
@Override
<U> FutureCompletionStage<U> thenCompose(Function<? super V, ? extends CompletionStage<U>> fn);
@Override
<U> FutureCompletionStage<U> thenComposeAsync(Function<? super V, ? extends CompletionStage<U>> fn);
@Override
FutureCompletionStage<V> whenComplete(BiConsumer<? super V, ? super Throwable> action);
@Override
FutureCompletionStage<V> whenCompleteAsync(BiConsumer<? super V, ? super Throwable> action);
@Override
<U> FutureCompletionStage<U> handle(BiFunction<? super V, Throwable, ? extends U> fn);
@Override
<U> FutureCompletionStage<U> handleAsync(BiFunction<? super V, Throwable, ? extends U> fn);
@Override
<U> FutureCompletionStage<U> thenApplyAsync(Function<? super V, ? extends U> fn, Executor executor);
@Override
FutureCompletionStage<Void> thenAcceptAsync(Consumer<? super V> action, Executor executor);
@Override
FutureCompletionStage<Void> thenRunAsync(Runnable action, Executor executor);
@Override
<U, V1> FutureCompletionStage<V1> thenCombineAsync(
CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn, Executor executor);
@Override
<U> FutureCompletionStage<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action, Executor executor);
@Override
FutureCompletionStage<Void> runAfterBothAsync(
CompletionStage<?> other, Runnable action, Executor executor);
@Override
<U> FutureCompletionStage<U> applyToEitherAsync(
CompletionStage<? extends V> other, Function<? super V, U> fn, Executor executor);
@Override
FutureCompletionStage<Void> acceptEitherAsync(
CompletionStage<? extends V> other, Consumer<? super V> action, Executor executor);
@Override
FutureCompletionStage<Void> runAfterEitherAsync(
CompletionStage<?> other, Runnable action, Executor executor);
@Override
<U> FutureCompletionStage<U> thenComposeAsync(
Function<? super V, ? extends CompletionStage<U>> fn, Executor executor);
@Override
FutureCompletionStage<V> exceptionally(Function<Throwable, ? extends V> fn);
@Override
FutureCompletionStage<V> whenCompleteAsync(BiConsumer<? super V, ? super Throwable> action, Executor executor);
@Override
<U> FutureCompletionStage<U> handleAsync(BiFunction<? super V, Throwable, ? extends U> fn, Executor executor);
/**
* Returns a {@link FutureCompletionStage} for the given {@link CompletionStage}
* that is pinned to the given {@link EventExecutor}.
*/
static <U> FutureCompletionStage<U> toFutureCompletionStage(CompletionStage<U> stage, EventExecutor executor) {
Objects.requireNonNull(stage, "stage");
Objects.requireNonNull(executor, "executor");
if (stage instanceof FutureCompletionStage && ((FutureCompletionStage) stage).executor() == executor) {
return (FutureCompletionStage<U>) stage;
}
// Try fast-path for CompletableFuture instances that are already complete to reduce object creation.
if (stage instanceof CompletableFuture) {
CompletableFuture<U> future = (CompletableFuture<U>) stage;
if (future.isDone() && !future.isCompletedExceptionally()) {
return executor.newSucceededFuture(future.getNow(null)).asStage();
}
}
Promise<U> promise = executor.newPromise();
stage.whenComplete((v, cause) -> {
if (cause != null) {
promise.setFailure(cause);
} else {
promise.setSuccess(v);
}
});
return promise.asStage();
}
}

View File

@ -34,6 +34,11 @@ final class RunnableFutureAdapter<V> implements RunnableFuture<V> {
this.task = requireNonNull(task, "task"); this.task = requireNonNull(task, "task");
} }
@Override
public EventExecutor executor() {
return promise.executor();
}
@Override @Override
public boolean isSuccess() { public boolean isSuccess() {
return promise.isSuccess(); return promise.isSuccess();

View File

@ -53,6 +53,11 @@ final class RunnableScheduledFutureAdapter<V> implements RunnableScheduledFuture
this.periodNanos = periodNanos; this.periodNanos = periodNanos;
} }
@Override
public EventExecutor executor() {
return executor;
}
@Override @Override
public long deadlineNanos() { public long deadlineNanos() {
return deadlineNanos; return deadlineNanos;

View File

@ -0,0 +1,68 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import org.junit.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
public class FutureCompletionStageTest {
@Test
public void testCompleteFuture() {
FutureCompletionStage stage = FutureCompletionStage.toFutureCompletionStage(
CompletableFuture.completedFuture(Boolean.TRUE), ImmediateEventExecutor.INSTANCE);
assertSame(ImmediateEventExecutor.INSTANCE, stage.executor());
assertSame(Boolean.TRUE, stage.future().syncUninterruptibly().getNow());
}
@Test
public void testCompleteFutureFailed() {
IllegalStateException exception = new IllegalStateException();
CompletableFuture<Boolean> future = new CompletableFuture<>();
future.completeExceptionally(exception);
FutureCompletionStage stage = FutureCompletionStage.toFutureCompletionStage(
future, ImmediateEventExecutor.INSTANCE);
assertSame(ImmediateEventExecutor.INSTANCE, stage.executor());
assertSame(exception, stage.future().awaitUninterruptibly().cause());
}
@Test
public void testFutureCompletionStageWithSameExecutor() {
FutureCompletionStage<Boolean> stage = ImmediateEventExecutor.INSTANCE
.newSucceededFuture(Boolean.TRUE).asStage();
assertSame(stage, FutureCompletionStage.toFutureCompletionStage(stage, ImmediateEventExecutor.INSTANCE));
}
@Test
public void testFutureCompletionStageWithDifferentExecutor() {
MultithreadEventExecutorGroup group = new MultithreadEventExecutorGroup(1, Executors.defaultThreadFactory());
try {
FutureCompletionStage<Boolean> stage = group.next().newSucceededFuture(Boolean.TRUE).asStage();
FutureCompletionStage<Boolean> stage2 = FutureCompletionStage.toFutureCompletionStage(
stage, ImmediateEventExecutor.INSTANCE);
assertNotSame(stage, stage2);
assertSame(stage.future().syncUninterruptibly().getNow(), stage2.future().syncUninterruptibly().getNow());
} finally {
group.shutdownGracefully();
}
}
}

View File

@ -42,7 +42,7 @@ abstract class CompleteChannelFuture extends CompleteFuture<Void> implements Cha
} }
@Override @Override
protected EventExecutor executor() { public EventExecutor executor() {
EventExecutor e = super.executor(); EventExecutor e = super.executor();
if (e == null) { if (e == null) {
return channel().eventLoop(); return channel().eventLoop();

View File

@ -15,6 +15,7 @@
*/ */
package io.netty.channel; package io.netty.channel;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.PromiseNotificationUtil; import io.netty.util.internal.PromiseNotificationUtil;
@ -44,6 +45,11 @@ public final class DelegatingChannelPromiseNotifier implements ChannelPromise, C
this.logNotifyFailure = logNotifyFailure; this.logNotifyFailure = logNotifyFailure;
} }
@Override
public EventExecutor executor() {
return delegate.executor();
}
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
InternalLogger internalLogger = logNotifyFailure ? logger : null; InternalLogger internalLogger = logNotifyFailure ? logger : null;

View File

@ -17,7 +17,7 @@ package io.netty.channel;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import io.netty.util.concurrent.AbstractFuture; import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.UnstableApi; import io.netty.util.internal.UnstableApi;
@ -25,7 +25,7 @@ import io.netty.util.internal.UnstableApi;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@UnstableApi @UnstableApi
public final class VoidChannelPromise extends AbstractFuture<Void> implements ChannelPromise { public final class VoidChannelPromise implements ChannelPromise {
private final Channel channel; private final Channel channel;
// Will be null if we should not propagate exceptions through the pipeline on failure case. // Will be null if we should not propagate exceptions through the pipeline on failure case.
@ -51,6 +51,11 @@ public final class VoidChannelPromise extends AbstractFuture<Void> implements Ch
} }
} }
@Override
public EventExecutor executor() {
return channel.eventLoop();
}
@Override @Override
public VoidChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) { public VoidChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
fail(); fail();

View File

@ -45,7 +45,7 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
private static final AtomicInteger nextId = new AtomicInteger(); private static final AtomicInteger nextId = new AtomicInteger();
private final String name; private final String name;
private final EventExecutor executor; final EventExecutor executor;
private final ConcurrentMap<ChannelId, Channel> serverChannels = new ConcurrentHashMap<>(); private final ConcurrentMap<ChannelId, Channel> serverChannels = new ConcurrentHashMap<>();
private final ConcurrentMap<ChannelId, Channel> nonServerChannels = new ConcurrentHashMap<>(); private final ConcurrentMap<ChannelId, Channel> nonServerChannels = new ConcurrentHashMap<>();
private final ChannelFutureListener remover = future -> remove(future.channel()); private final ChannelFutureListener remover = future -> remove(future.channel());

View File

@ -17,6 +17,7 @@ package io.netty.channel.group;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.GenericFutureListener;
@ -27,12 +28,17 @@ import java.util.concurrent.TimeUnit;
final class VoidChannelGroupFuture implements ChannelGroupFuture { final class VoidChannelGroupFuture implements ChannelGroupFuture {
private static final Iterator<ChannelFuture> EMPTY = Collections.<ChannelFuture>emptyList().iterator(); private static final Iterator<ChannelFuture> EMPTY = Collections.<ChannelFuture>emptyList().iterator();
private final ChannelGroup group; private final DefaultChannelGroup group;
VoidChannelGroupFuture(ChannelGroup group) { VoidChannelGroupFuture(DefaultChannelGroup group) {
this.group = group; this.group = group;
} }
@Override
public EventExecutor executor() {
return group.executor;
}
@Override @Override
public ChannelGroup group() { public ChannelGroup group() {
return group; return group;