Add a Futures class with static map() and flatMap() methods (#11607)

Motivation:
Making futures easier to compose, combine, and extend is useful to have as part of the API, since implementing this correctly and efficiently can be tricky.

Modification:
Add `Future.map(Function<V,R>) -> Future<R>` and `Future.flatMap(Function<V,Future<R>>) -> Future<R>` default methods to the `Future` interface.
These methods return new Future instance, that will be completed when the original future completes, and the result will be processed through the given mapping function.
These two methods take care to propagate cancellation and exceptions correctly:
Cancellation propagates both ways between the new and original future.
Failures only propagate from the original future to the returned new Future instance.

Result:
A few convenient methods for modifying and composing futures.

This PR fixes #8523, and perhaps also #2105
This commit is contained in:
Chris Vest 2021-08-26 11:23:12 +02:00 committed by GitHub
parent 584a275a7b
commit 445f747ce3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 608 additions and 134 deletions

View File

@ -20,28 +20,23 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.function.Function;
/** /**
* The result of an asynchronous operation. * The result of an asynchronous operation.
* <p> * <p>
* An asynchronous operation is one that might be completed outside a given * An asynchronous operation is one that might be completed outside a given thread of execution. The operation can
* thread of execution. The operation can either be performing computation, * either be performing computation, or I/O, or both.
* or I/O, or both.
* <p> * <p>
* All I/O operations in Netty are asynchronous. It means any I/O calls will * All I/O operations in Netty are asynchronous. It means any I/O calls will return immediately with no guarantee that
* return immediately with no guarantee that the requested I/O operation has * the requested I/O operation has been completed at the end of the call. Instead, you will be returned with a {@link
* been completed at the end of the call. Instead, you will be returned with * Future} instance which gives you the information about the result or status of the I/O operation.
* a {@link Future} instance which gives you the information about the
* result or status of the I/O operation.
* <p> * <p>
* A {@link Future} is either <em>uncompleted</em> or <em>completed</em>. * A {@link Future} is either <em>uncompleted</em> or <em>completed</em>. When an I/O operation begins, a new future
* When an I/O operation begins, a new future object is created. The new future * object is created. The new future is uncompleted initially - it is neither succeeded, failed, nor cancelled because
* is uncompleted initially - it is neither succeeded, failed, nor cancelled * the I/O operation is not finished yet. If the I/O operation is finished either successfully, with failure, or by
* because the I/O operation is not finished yet. If the I/O operation is * cancellation, the future is marked as completed with more specific information, such as the cause of the failure.
* finished either successfully, with failure, or by cancellation, the future is * Please note that even failure and cancellation belong to the completed state.
* marked as completed with more specific information, such as the cause of the
* failure. Please note that even failure and cancellation belong to the
* completed state.
* <pre> * <pre>
* +---------------------------+ * +---------------------------+
* | Completed successfully | * | Completed successfully |
@ -60,41 +55,34 @@ import java.util.concurrent.TimeoutException;
* | isCancelled() = true | * | isCancelled() = true |
* +---------------------------+ * +---------------------------+
* </pre> * </pre>
* * <p>
* Various methods are provided to let you check if the I/O operation has been * Various methods are provided to let you check if the I/O operation has been completed, wait for the completion, and
* completed, wait for the completion, and retrieve the result of the I/O * retrieve the result of the I/O operation. It also allows you to add {@link FutureListener}s so you can get notified
* operation. It also allows you to add {@link FutureListener}s so you * when the I/O operation is completed.
* can get notified when the I/O operation is completed.
* *
* <h3>Prefer {@link #addListener(FutureListener)} to {@link #await()}</h3> * <h3>Prefer {@link #addListener(FutureListener)} to {@link #await()}</h3>
* * <p>
* It is recommended to prefer {@link #addListener(FutureListener)}, or * It is recommended to prefer {@link #addListener(FutureListener)}, or {@link #addListener(Object,
* {@link #addListener(Object, FutureContextListener)}, to {@link #await()} * FutureContextListener)}, to {@link #await()} wherever possible to get notified when an I/O operation is done and to
* wherever possible to get notified when an I/O operation is done and to
* do any follow-up tasks. * do any follow-up tasks.
* <p> * <p>
* The {@link #addListener(FutureListener)} method is non-blocking. It simply adds * The {@link #addListener(FutureListener)} method is non-blocking. It simply adds the specified {@link FutureListener}
* the specified {@link FutureListener} to the {@link Future}, and the I/O thread * to the {@link Future}, and the I/O thread will notify the listeners when the I/O operation associated with the future
* will notify the listeners when the I/O operation associated with the future is * is done. The {@link FutureListener} and {@link FutureContextListener} callbacks yield the best performance and
* done. The {@link FutureListener} and {@link FutureContextListener} callbacks * resource utilization because it does not block at all, but it could be tricky to implement a sequential logic if you
* yield the best performance and resource utilization because it does not block at * are not used to event-driven programming.
* all, but it could be tricky to implement a sequential logic if you are not used to
* event-driven programming.
* <p> * <p>
* By contrast, {@link #await()} is a blocking operation. Once called, the * By contrast, {@link #await()} is a blocking operation. Once called, the caller thread blocks until the operation is
* caller thread blocks until the operation is done. It is easier to implement * done. It is easier to implement a sequential logic with {@link #await()}, but the caller thread blocks unnecessarily
* a sequential logic with {@link #await()}, but the caller thread blocks * until the I/O operation is done and there's relatively expensive cost of inter-thread notification. Moreover, there's
* unnecessarily until the I/O operation is done and there's relatively * a chance of dead-lock in a particular circumstance, which is described below.
* expensive cost of inter-thread notification. Moreover, there's a chance of
* dead-lock in a particular circumstance, which is described below.
* *
* <h3>Do not call {@link #await()} inside a {@link io.netty.channel.ChannelHandler}</h3> * <h3>Do not call {@link #await()} inside a {@link io.netty.channel.ChannelHandler}</h3>
* <p> * <p>
* The event handler methods in {@link io.netty.channel.ChannelHandler} are usually * The event handler methods in {@link io.netty.channel.ChannelHandler} are usually called by an I/O thread. If {@link
* called by an I/O thread. If {@link #await()} is called by an event handler method, * #await()} is called by an event handler method, which is called by the I/O thread, the I/O operation it is waiting
* which is called by the I/O thread, the I/O operation it is waiting for might never * for might never complete because {@link #await()} can block the I/O operation it is waiting for, which is a
* complete because {@link #await()} can block the I/O operation it is waiting for, * dead-lock.
* which is a dead-lock.
* <pre> * <pre>
* // BAD - NEVER DO THIS * // BAD - NEVER DO THIS
* {@code @Override} * {@code @Override}
@ -118,19 +106,16 @@ import java.util.concurrent.TimeoutException;
* } * }
* </pre> * </pre>
* <p> * <p>
* In spite of the disadvantages mentioned above, there are certainly the cases * In spite of the disadvantages mentioned above, there are certainly the cases where it is more convenient to call
* where it is more convenient to call {@link #await()}. In such a case, please * {@link #await()}. In such a case, please make sure you do not call {@link #await()} in an I/O thread. Otherwise,
* make sure you do not call {@link #await()} in an I/O thread. Otherwise,
* {@link BlockingOperationException} will be raised to prevent a dead-lock. * {@link BlockingOperationException} will be raised to prevent a dead-lock.
* *
* <h3>Do not confuse I/O timeout and await timeout</h3> * <h3>Do not confuse I/O timeout and await timeout</h3>
* * <p>
* The timeout value you specify with {@link #await(long)}, * The timeout value you specify with {@link #await(long)}, {@link #await(long, TimeUnit)}, {@link
* {@link #await(long, TimeUnit)}, {@link #awaitUninterruptibly(long)}, or * #awaitUninterruptibly(long)}, or {@link #awaitUninterruptibly(long, TimeUnit)} are not related with I/O timeout at
* {@link #awaitUninterruptibly(long, TimeUnit)} are not related with I/O * all. If an I/O operation times out, the future will be marked as 'completed with failure,' as depicted in the
* timeout at all. If an I/O operation times out, the future will be marked as * diagram above. For example, connect timeout should be configured via a transport-specific option:
* 'completed with failure,' as depicted in the diagram above. For example,
* connect timeout should be configured via a transport-specific option:
* <pre> * <pre>
* // BAD - NEVER DO THIS * // BAD - NEVER DO THIS
* {@link io.netty.bootstrap.Bootstrap} b = ...; * {@link io.netty.bootstrap.Bootstrap} b = ...;
@ -183,119 +168,99 @@ public interface Future<V> extends java.util.concurrent.Future<V> {
boolean isCancellable(); boolean isCancellable();
/** /**
* Returns the cause of the failed I/O operation if the I/O operation has * Returns the cause of the failed I/O operation if the I/O operation has failed.
* failed.
* *
* @return the cause of the failure. * @return the cause of the failure. {@code null} if succeeded.
* {@code null} if succeeded.
* @throws IllegalStateException if this {@code Future} has not completed yet. * @throws IllegalStateException if this {@code Future} has not completed yet.
*/ */
Throwable cause(); Throwable cause();
/** /**
* Adds the specified listener to this future. * Adds the specified listener to this future. The specified listener is notified when this future is {@linkplain
* The specified listener is notified when this future is {@linkplain #isDone() done}. * #isDone() done}. If this future is already completed, the specified listener is notified immediately.
* If this future is already completed, the specified listener is notified immediately.
* *
* @param listener The listener to be called when this future completes. * @param listener The listener to be called when this future completes. The listener will be passed this future as
* The listener will be passed this future as an argument. * an argument.
* @return this future object. * @return this future object.
*/ */
Future<V> addListener(FutureListener<? super V> listener); Future<V> addListener(FutureListener<? super V> listener);
/** /**
* Adds the specified listener to this future. * Adds the specified listener to this future. The specified listener is notified when this future is {@linkplain
* The specified listener is notified when this future is {@linkplain #isDone() done}. * #isDone() done}. If this future is already completed, the specified listener is notified immediately.
* If this future is already completed, the specified listener is notified immediately.
* *
* @param context The context object that will be passed to the listener when this future completes. * @param context The context object that will be passed to the listener when this future completes.
* @param listener The listener to be called when this future completes. * @param listener The listener to be called when this future completes. The listener will be passed the given
* The listener will be passed the given context, and this future. * context, and this future.
* @return this future object. * @return this future object.
*/ */
<C> Future<V> addListener(C context, FutureContextListener<? super C, ? super V> listener); <C> Future<V> addListener(C context, FutureContextListener<? super C, ? super V> listener);
/** /**
* Waits for this future until it is done, and rethrows the cause of the failure if this future * Waits for this future until it is done, and rethrows the cause of the failure if this future failed.
* failed.
* *
* @throws CancellationException if the computation was cancelled * @throws CancellationException if the computation was cancelled
* @throws CompletionException if the computation threw an exception. * @throws CompletionException if the computation threw an exception.
* @throws InterruptedException if the current thread was interrupted while waiting * @throws InterruptedException if the current thread was interrupted while waiting
*
*/ */
Future<V> sync() throws InterruptedException; Future<V> sync() throws InterruptedException;
/** /**
* Waits for this future until it is done, and rethrows the cause of the failure if this future * Waits for this future until it is done, and rethrows the cause of the failure if this future failed.
* failed.
* *
* @throws CancellationException if the computation was cancelled * @throws CancellationException if the computation was cancelled
* @throws CompletionException if the computation threw an exception. * @throws CompletionException if the computation threw an exception.
*/ */
Future<V> syncUninterruptibly(); Future<V> syncUninterruptibly();
/** /**
* Waits for this future to be completed. * Waits for this future to be completed.
* *
* @throws InterruptedException * @throws InterruptedException if the current thread was interrupted
* if the current thread was interrupted
*/ */
Future<V> await() throws InterruptedException; Future<V> await() throws InterruptedException;
/** /**
* Waits for this future to be completed without * Waits for this future to be completed without interruption. This method catches an {@link InterruptedException}
* interruption. This method catches an {@link InterruptedException} and * and discards it silently.
* discards it silently.
*/ */
Future<V> awaitUninterruptibly(); Future<V> awaitUninterruptibly();
/** /**
* Waits for this future to be completed within the * Waits for this future to be completed within the specified time limit.
* specified time limit.
* *
* @return {@code true} if and only if the future was completed within * @return {@code true} if and only if the future was completed within the specified time limit
* the specified time limit * @throws InterruptedException if the current thread was interrupted
*
* @throws InterruptedException
* if the current thread was interrupted
*/ */
boolean await(long timeout, TimeUnit unit) throws InterruptedException; boolean await(long timeout, TimeUnit unit) throws InterruptedException;
/** /**
* Waits for this future to be completed within the * Waits for this future to be completed within the specified time limit.
* specified time limit.
* *
* @return {@code true} if and only if the future was completed within * @return {@code true} if and only if the future was completed within the specified time limit
* the specified time limit * @throws InterruptedException if the current thread was interrupted
*
* @throws InterruptedException
* if the current thread was interrupted
*/ */
boolean await(long timeoutMillis) throws InterruptedException; boolean await(long timeoutMillis) throws InterruptedException;
/** /**
* Waits for this future to be completed within the * Waits for this future to be completed within the specified time limit without interruption. This method catches
* specified time limit without interruption. This method catches an * an {@link InterruptedException} and discards it silently.
* {@link InterruptedException} and discards it silently.
* *
* @return {@code true} if and only if the future was completed within * @return {@code true} if and only if the future was completed within the specified time limit
* the specified time limit
*/ */
boolean awaitUninterruptibly(long timeout, TimeUnit unit); boolean awaitUninterruptibly(long timeout, TimeUnit unit);
/** /**
* Waits for this future to be completed within the * Waits for this future to be completed within the specified time limit without interruption. This method catches
* specified time limit without interruption. This method catches an * an {@link InterruptedException} and discards it silently.
* {@link InterruptedException} and discards it silently.
* *
* @return {@code true} if and only if the future was completed within * @return {@code true} if and only if the future was completed within the specified time limit
* the specified time limit
*/ */
boolean awaitUninterruptibly(long timeoutMillis); boolean awaitUninterruptibly(long timeoutMillis);
/** /**
* Return the result without blocking. If the future is not done yet this will throw {@link IllegalStateException}. * Return the result without blocking. If the future is not done yet this will throw {@link IllegalStateException}.
* <p>
* *
* @throws IllegalStateException if this {@code Future} has not completed yet. * @throws IllegalStateException if this {@code Future} has not completed yet.
*/ */
@ -303,7 +268,7 @@ public interface Future<V> extends java.util.concurrent.Future<V> {
/** /**
* {@inheritDoc} * {@inheritDoc}
* * <p>
* If the cancellation was successful it will fail the future with a {@link CancellationException}. * If the cancellation was successful it will fail the future with a {@link CancellationException}.
*/ */
@Override @Override
@ -344,10 +309,55 @@ public interface Future<V> extends java.util.concurrent.Future<V> {
} }
/** /**
* Returns a {@link FutureCompletionStage} that reflects the state of this {@link Future} and so will receive * Returns a {@link FutureCompletionStage} that reflects the state of this {@link Future} and so will receive all
* all updates as well. * updates as well.
*/ */
default FutureCompletionStage<V> asStage() { default FutureCompletionStage<V> asStage() {
return new DefaultFutureCompletionStage<>(this); return new DefaultFutureCompletionStage<>(this);
} }
/**
* Creates a <strong>new</strong> {@link Future} that will complete with the result of this {@link Future} mapped
* through the given mapper function.
* <p>
* If this future fails, then the returned future will fail as well, with the same exception. Cancellation of either
* future will cancel the other. If the mapper function throws, the returned future will fail, but this future will
* be unaffected.
*
* @param mapper The function that will convert the result of this future into the result of the returned future.
* @param <R> The result type of the mapper function, and of the returned future.
* @return A new future instance that will complete with the mapped result of this future.
*/
default <R> Future<R> map(Function<V, R> mapper) {
return Futures.map(this, mapper);
}
/**
* Creates a <strong>new</strong> {@link Future} that will complete with the result of this {@link Future}
* flat-mapped through the given mapper function.
* <p>
* The "flat" in "flat-map" means the given mapper function produces a result that itself is a future-of-R, yet this
* method also returns a future-of-R, rather than a future-of-future-of-R. In other words, if the same mapper
* function was used with the {@link #map(Function)} method, you would get back a {@code Future<Future<R>>}. These
* nested futures are "flattened" into a {@code Future<R>} by this method.
* <p>
* Effectively, this method behaves similar to this serial code, except asynchronously and with proper exception and
* cancellation handling:
* <pre>{@code
* V x = future.sync().getNow();
* Future<R> y = mapper.apply(x);
* R result = y.sync().getNow();
* }</pre>
* <p>
* If the given future fails, then the returned future will fail as well, with the same exception. Cancellation of
* either future will cancel the other. If the mapper function throws, the returned future will fail, but this
* future will be unaffected.
*
* @param mapper The function that will convert the result of this future into the result of the returned future.
* @param <R> The result type of the mapper function, and of the returned future.
* @return A new future instance that will complete with the mapped result of this future.
*/
default <R> Future<R> flatMap(Function<V, Future<R>> mapper) {
return Futures.flatMap(this, mapper);
}
} }

View File

@ -0,0 +1,232 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.concurrent.Callable;
import java.util.function.Function;
import static io.netty.util.internal.PromiseNotificationUtil.tryFailure;
import static java.util.Objects.requireNonNull;
/**
* Combinator operations on {@linkplain Future futures}.
* <p>
* Used for implementing {@link Future#map(Function)} and {@link Future#flatMap(Function)}
*
* @implNote The operations themselves are implemented as static inner classes instead of lambdas to aid debugging.
*/
final class Futures {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Futures.class);
private static final PassThrough<?> PASS_THROUGH = new PassThrough<Object>();
private static final PropagateCancel PROPAGATE_CANCEL = new PropagateCancel();
/**
* Creates a new {@link Future} that will complete with the result of the given {@link Future} mapped through the
* given mapper function.
* <p>
* If the given future fails, then the returned future will fail as well, with the same exception. Cancellation of
* either future will cancel the other. If the mapper function throws, the returned future will fail, but the given
* future will be unaffected.
*
* @param future The future whose result will flow to the returned future, through the mapping function.
* @param mapper The function that will convert the result of the given future into the result of the returned
* future.
* @param <R> The result type of the mapper function, and of the returned future.
* @return A new future instance that will complete with the mapped result of the given future.
*/
public static <V, R> Future<R> map(Future<V> future, Function<V, R> mapper) {
requireNonNull(future, "future");
requireNonNull(mapper, "mapper");
if (future.isFailed()) {
@SuppressWarnings("unchecked") // Cast is safe because the result type is not used in failed futures.
Future<R> failed = (Future<R>) future;
return failed;
}
if (future.isSuccess()) {
return future.executor().submit(new CallableMapper<>(future, mapper));
}
Promise<R> promise = future.executor().newPromise();
future.addListener(new Mapper<>(promise, mapper));
promise.addListener(future, propagateCancel());
return promise;
}
/**
* Creates a new {@link Future} that will complete with the result of the given {@link Future} flat-mapped through
* the given mapper function.
* <p>
* The "flat" in "flat-map" means the given mapper function produces a result that itself is a future-of-R, yet this
* method also returns a future-of-R, rather than a future-of-future-of-R. In other words, if the same mapper
* function was used with the {@link #map(Future, Function)} method, you would get back a {@code Future<Future<R>>}.
* These nested futures are "flattened" into a {@code Future<R>} by this method.
* <p>
* Effectively, this method behaves similar to this serial code, except asynchronously and with proper exception and
* cancellation handling:
* <pre>{@code
* V x = future.sync().getNow();
* Future<R> y = mapper.apply(x);
* R result = y.sync().getNow();
* }</pre>
* <p>
* If the given future fails, then the returned future will fail as well, with the same exception. Cancellation of
* either future will cancel the other. If the mapper function throws, the returned future will fail, but the given
* future will be unaffected.
*
* @param mapper The function that will convert the result of the given future into the result of the returned
* future.
* @param <R> The result type of the mapper function, and of the returned future.
* @return A new future instance that will complete with the mapped result of the given future.
*/
public static <V, R> Future<R> flatMap(Future<V> future, Function<V, Future<R>> mapper) {
requireNonNull(future, "future");
requireNonNull(mapper, "mapper");
Promise<R> promise = future.executor().newPromise();
future.addListener(new FlatMapper<>(promise, mapper));
if (!future.isSuccess()) {
// Propagate cancellation if future is either incomplete or failed.
// Failed means it could be cancelled, so that needs to be propagated.
promise.addListener(future, propagateCancel());
}
return promise;
}
@SuppressWarnings("unchecked")
static FutureContextListener<Future<?>, Object> propagateCancel() {
return (FutureContextListener<Future<?>, Object>) (FutureContextListener<?, ?>) PROPAGATE_CANCEL;
}
@SuppressWarnings("unchecked")
static <R> FutureContextListener<Promise<R>, Object> passThrough() {
return (FutureContextListener<Promise<R>, Object>) (FutureContextListener<?, ?>) PASS_THROUGH;
}
static <A, B> void propagateUncommonCompletion(Future<? extends A> completed, Promise<B> recipient) {
if (completed.isCancelled()) {
// Don't check or log if cancellation propagation fails.
// Propagation goes both ways, which means at least one future will already be cancelled here.
recipient.cancel(false);
} else {
Throwable cause = completed.cause();
tryFailure(recipient, cause, logger);
}
}
private Futures() {
}
private static final class PropagateCancel implements FutureContextListener<Future<Object>, Object> {
@Override
public void operationComplete(Future<Object> context, Future<?> future) throws Exception {
if (future.isCancelled()) {
context.cancel(false);
}
}
}
private static final class PassThrough<R> implements FutureContextListener<Promise<R>, Object> {
@Override
public void operationComplete(Promise<R> recipient, Future<?> completed) throws Exception {
if (completed.isSuccess()) {
try {
@SuppressWarnings("unchecked")
R result = (R) completed.getNow();
recipient.trySuccess(result);
} catch (Throwable e) {
tryFailure(recipient, e, logger);
}
} else {
propagateUncommonCompletion(completed, recipient);
}
}
}
private static final class CallableMapper<R, T> implements Callable<R> {
private final Future<T> future;
private final Function<T, R> mapper;
CallableMapper(Future<T> future, Function<T, R> mapper) {
this.future = future;
this.mapper = mapper;
}
@Override
public R call() throws Exception {
return mapper.apply(future.getNow());
}
}
private static final class Mapper<R, T> implements FutureListener<Object> {
private final Promise<R> recipient;
private final Function<T, R> mapper;
Mapper(Promise<R> recipient, Function<T, R> mapper) {
this.recipient = recipient;
this.mapper = mapper;
}
@Override
public void operationComplete(Future<?> completed) throws Exception {
if (completed.isSuccess()) {
try {
@SuppressWarnings("unchecked")
T result = (T) completed.getNow();
R mapped = mapper.apply(result);
recipient.trySuccess(mapped);
} catch (Throwable e) {
tryFailure(recipient, e, logger);
}
} else {
propagateUncommonCompletion(completed, recipient);
}
}
}
private static final class FlatMapper<R, T> implements FutureListener<Object> {
private final Promise<R> recipient;
private final Function<T, Future<R>> mapper;
FlatMapper(Promise<R> recipient, Function<T, Future<R>> mapper) {
this.recipient = recipient;
this.mapper = mapper;
}
@Override
public void operationComplete(Future<?> completed) throws Exception {
if (completed.isSuccess()) {
try {
@SuppressWarnings("unchecked")
T result = (T) completed.getNow();
Future<R> future = mapper.apply(result);
if (future.isSuccess()) {
recipient.trySuccess(future.getNow());
} else if (future.isFailed()) {
propagateUncommonCompletion(future, recipient);
} else {
future.addListener(recipient, passThrough());
recipient.addListener(future, propagateCancel());
}
} catch (Throwable e) {
tryFailure(recipient, e, logger);
}
} else {
propagateUncommonCompletion(completed, recipient);
}
}
}
}

View File

@ -72,5 +72,4 @@ public final class PromiseNotificationUtil {
} }
} }
} }
} }

View File

@ -38,6 +38,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static io.netty.util.concurrent.ImmediateEventExecutor.INSTANCE;
import static java.lang.Math.max; import static java.lang.Math.max;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
@ -172,21 +173,21 @@ public class DefaultPromiseTest {
@Test @Test
public void testCancellationExceptionIsThrownWhenBlockingGet() throws Exception { public void testCancellationExceptionIsThrownWhenBlockingGet() throws Exception {
DefaultPromise<Void> promise = new DefaultPromise<>(ImmediateEventExecutor.INSTANCE); DefaultPromise<Void> promise = new DefaultPromise<>(INSTANCE);
assertTrue(promise.cancel(false)); assertTrue(promise.cancel(false));
assertThrows(CancellationException.class, promise::get); assertThrows(CancellationException.class, promise::get);
} }
@Test @Test
public void testCancellationExceptionIsThrownWhenBlockingGetWithTimeout() throws Exception { public void testCancellationExceptionIsThrownWhenBlockingGetWithTimeout() throws Exception {
DefaultPromise<Void> promise = new DefaultPromise<>(ImmediateEventExecutor.INSTANCE); DefaultPromise<Void> promise = new DefaultPromise<>(INSTANCE);
assertTrue(promise.cancel(false)); assertTrue(promise.cancel(false));
assertThrows(CancellationException.class, () -> promise.get(1, TimeUnit.SECONDS)); assertThrows(CancellationException.class, () -> promise.get(1, TimeUnit.SECONDS));
} }
@Test @Test
public void testCancellationExceptionIsReturnedAsCause() throws Exception { public void testCancellationExceptionIsReturnedAsCause() throws Exception {
DefaultPromise<Void> promise = new DefaultPromise<>(ImmediateEventExecutor.INSTANCE); DefaultPromise<Void> promise = new DefaultPromise<>(INSTANCE);
assertTrue(promise.cancel(false)); assertTrue(promise.cancel(false));
assertThat(promise.cause()).isInstanceOf(CancellationException.class); assertThat(promise.cause()).isInstanceOf(CancellationException.class);
assertTrue(promise.isFailed()); assertTrue(promise.isFailed());
@ -194,8 +195,8 @@ public class DefaultPromiseTest {
@Test @Test
public void testStackOverflowWithImmediateEventExecutorA() throws Exception { public void testStackOverflowWithImmediateEventExecutorA() throws Exception {
testStackOverFlowChainedFuturesA(stackOverflowTestDepth(), ImmediateEventExecutor.INSTANCE, true); testStackOverFlowChainedFuturesA(stackOverflowTestDepth(), INSTANCE, true);
testStackOverFlowChainedFuturesA(stackOverflowTestDepth(), ImmediateEventExecutor.INSTANCE, false); testStackOverFlowChainedFuturesA(stackOverflowTestDepth(), INSTANCE, false);
} }
@Test @Test
@ -216,8 +217,8 @@ public class DefaultPromiseTest {
@Test @Test
public void testNoStackOverflowWithImmediateEventExecutorB() throws Exception { public void testNoStackOverflowWithImmediateEventExecutorB() throws Exception {
testStackOverFlowChainedFuturesB(stackOverflowTestDepth(), ImmediateEventExecutor.INSTANCE, true); testStackOverFlowChainedFuturesB(stackOverflowTestDepth(), INSTANCE, true);
testStackOverFlowChainedFuturesB(stackOverflowTestDepth(), ImmediateEventExecutor.INSTANCE, false); testStackOverFlowChainedFuturesB(stackOverflowTestDepth(), INSTANCE, false);
} }
@Test @Test
@ -354,7 +355,7 @@ public class DefaultPromiseTest {
@Test @Test
public void signalUncancellableCompletionValue() { public void signalUncancellableCompletionValue() {
DefaultPromise<Signal> promise = new DefaultPromise<>(ImmediateEventExecutor.INSTANCE); DefaultPromise<Signal> promise = new DefaultPromise<>(INSTANCE);
promise.setSuccess(Signal.valueOf(DefaultPromise.class, "UNCANCELLABLE")); promise.setSuccess(Signal.valueOf(DefaultPromise.class, "UNCANCELLABLE"));
assertTrue(promise.isDone()); assertTrue(promise.isDone());
assertTrue(promise.isSuccess()); assertTrue(promise.isSuccess());
@ -363,7 +364,7 @@ public class DefaultPromiseTest {
@Test @Test
public void signalSuccessCompletionValue() { public void signalSuccessCompletionValue() {
DefaultPromise<Signal> promise = new DefaultPromise<>(ImmediateEventExecutor.INSTANCE); DefaultPromise<Signal> promise = new DefaultPromise<>(INSTANCE);
promise.setSuccess(Signal.valueOf(DefaultPromise.class, "SUCCESS")); promise.setSuccess(Signal.valueOf(DefaultPromise.class, "SUCCESS"));
assertTrue(promise.isDone()); assertTrue(promise.isDone());
assertTrue(promise.isSuccess()); assertTrue(promise.isSuccess());
@ -372,7 +373,7 @@ public class DefaultPromiseTest {
@Test @Test
public void setUncancellableGetNow() { public void setUncancellableGetNow() {
DefaultPromise<String> promise = new DefaultPromise<>(ImmediateEventExecutor.INSTANCE); DefaultPromise<String> promise = new DefaultPromise<>(INSTANCE);
assertThrows(IllegalStateException.class, () -> promise.getNow()); assertThrows(IllegalStateException.class, () -> promise.getNow());
assertFalse(promise.isDone()); assertFalse(promise.isDone());
assertTrue(promise.setUncancellable()); assertTrue(promise.setUncancellable());
@ -392,7 +393,7 @@ public class DefaultPromiseTest {
@Test @Test
public void throwUncheckedSync() throws InterruptedException { public void throwUncheckedSync() throws InterruptedException {
Exception exception = new Exception(); Exception exception = new Exception();
DefaultPromise<String> promise = new DefaultPromise<>(ImmediateEventExecutor.INSTANCE); DefaultPromise<String> promise = new DefaultPromise<>(INSTANCE);
promise.setFailure(exception); promise.setFailure(exception);
assertTrue(promise.isFailed()); assertTrue(promise.isFailed());
@ -406,7 +407,7 @@ public class DefaultPromiseTest {
@Test @Test
public void throwUncheckedSyncUninterruptibly() { public void throwUncheckedSyncUninterruptibly() {
Exception exception = new Exception(); Exception exception = new Exception();
DefaultPromise<String> promise = new DefaultPromise<>(ImmediateEventExecutor.INSTANCE); DefaultPromise<String> promise = new DefaultPromise<>(INSTANCE);
promise.setFailure(exception); promise.setFailure(exception);
assertTrue(promise.isFailed()); assertTrue(promise.isFailed());
@ -419,14 +420,14 @@ public class DefaultPromiseTest {
@Test @Test
public void throwCancelled() throws InterruptedException { public void throwCancelled() throws InterruptedException {
DefaultPromise<String> promise = new DefaultPromise<>(ImmediateEventExecutor.INSTANCE); DefaultPromise<String> promise = new DefaultPromise<>(INSTANCE);
promise.cancel(true); promise.cancel(true);
assertThrows(CancellationException.class, promise::sync); assertThrows(CancellationException.class, promise::sync);
} }
@Test @Test
public void mustPassContextToContextListener() { public void mustPassContextToContextListener() {
DefaultPromise<Object> promise = new DefaultPromise<>(ImmediateEventExecutor.INSTANCE); DefaultPromise<Object> promise = new DefaultPromise<>(INSTANCE);
Object context = new Object(); Object context = new Object();
Object result = new Object(); Object result = new Object();
promise.addListener(context, (ctx, future) -> { promise.addListener(context, (ctx, future) -> {
@ -439,7 +440,7 @@ public class DefaultPromiseTest {
@Test @Test
public void mustPassNullContextToContextListener() { public void mustPassNullContextToContextListener() {
DefaultPromise<Object> promise = new DefaultPromise<>(ImmediateEventExecutor.INSTANCE); DefaultPromise<Object> promise = new DefaultPromise<>(INSTANCE);
Object result = new Object(); Object result = new Object();
promise.addListener(null, (ctx, future) -> { promise.addListener(null, (ctx, future) -> {
assertNull(ctx); assertNull(ctx);
@ -451,14 +452,14 @@ public class DefaultPromiseTest {
@Test @Test
public void getNowOnUnfinishedPromiseMustThrow() { public void getNowOnUnfinishedPromiseMustThrow() {
DefaultPromise<Object> promise = new DefaultPromise<>(ImmediateEventExecutor.INSTANCE); DefaultPromise<Object> promise = new DefaultPromise<>(INSTANCE);
assertThrows(IllegalStateException.class, () -> promise.getNow()); assertThrows(IllegalStateException.class, () -> promise.getNow());
} }
@SuppressWarnings("ThrowableNotThrown") @SuppressWarnings("ThrowableNotThrown")
@Test @Test
public void causeOnUnfinishedPromiseMustThrow() { public void causeOnUnfinishedPromiseMustThrow() {
DefaultPromise<Object> promise = new DefaultPromise<>(ImmediateEventExecutor.INSTANCE); DefaultPromise<Object> promise = new DefaultPromise<>(INSTANCE);
assertThrows(IllegalStateException.class, () -> promise.cause()); assertThrows(IllegalStateException.class, () -> promise.cause());
} }
@ -601,7 +602,7 @@ public class DefaultPromiseTest {
private static void testPromiseListenerAddWhenComplete(Throwable cause) throws InterruptedException { private static void testPromiseListenerAddWhenComplete(Throwable cause) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
DefaultPromise<Void> promise = new DefaultPromise<>(ImmediateEventExecutor.INSTANCE); DefaultPromise<Void> promise = new DefaultPromise<>(INSTANCE);
promise.addListener(future -> promise.addListener(future ->
promise.addListener(future1 -> latch.countDown())); promise.addListener(future1 -> latch.countDown()));
if (cause == null) { if (cause == null) {
@ -631,12 +632,6 @@ public class DefaultPromiseTest {
"Should have notified " + expectedCount + " listeners"); "Should have notified " + expectedCount + " listeners");
} }
private static final class TestEventExecutor extends SingleThreadEventExecutor {
TestEventExecutor() {
super(Executors.defaultThreadFactory());
}
}
private static RuntimeException fakeException() { private static RuntimeException fakeException() {
return new RuntimeException("fake exception"); return new RuntimeException("fake exception");
} }

View File

@ -0,0 +1,215 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import org.junit.jupiter.api.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import static io.netty.util.concurrent.DefaultPromise.newSuccessfulPromise;
import static io.netty.util.concurrent.ImmediateEventExecutor.INSTANCE;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
class FuturesTest {
@Test
public void mapMustApplyMapperFunctionWhenFutureSucceeds() {
DefaultPromise<Integer> promise = new DefaultPromise<>(INSTANCE);
Future<String> strFut = promise.map(i -> i.toString());
promise.setSuccess(42);
assertThat(strFut.getNow()).isEqualTo("42");
}
@Test
public void mapMustApplyMapperFunctionOnSuccededFuture() {
DefaultPromise<Integer> promise = new DefaultPromise<>(INSTANCE);
promise.setSuccess(42);
assertThat(promise.map(i -> i.toString()).getNow()).isEqualTo("42");
}
@Test
public void mapOnFailedFutureMustProduceFailedFuture() {
DefaultPromise<Integer> promise = new DefaultPromise<>(INSTANCE);
Exception cause = new Exception("boom");
promise.setFailure(cause);
assertThat(promise.map(i -> i.toString()).cause()).isSameAs(cause);
}
@Test
public void mapOnFailedFutureMustNotApplyMapperFunction() {
DefaultPromise<Integer> promise = new DefaultPromise<>(INSTANCE);
Exception cause = new Exception("boom");
promise.setFailure(cause);
AtomicInteger counter = new AtomicInteger();
assertThat(promise.map(i -> {
counter.getAndIncrement();
return i.toString();
}).cause()).isSameAs(cause);
assertThat(counter.get()).isZero();
}
@Test
public void mapMustFailReturnedFutureWhenMapperFunctionThrows() {
DefaultPromise<Integer> promise = new DefaultPromise<>(INSTANCE);
RuntimeException cause = new RuntimeException("boom");
Future<Object> future = promise.map(i -> {
throw cause;
});
promise.setSuccess(42);
assertThat(future.cause()).isSameAs(cause);
}
@Test
public void mapMustNotFailOriginalFutureWhenMapperFunctionThrows() {
DefaultPromise<Integer> promise = new DefaultPromise<>(INSTANCE);
promise.map(i -> {
throw new RuntimeException("boom");
});
promise.setSuccess(42);
assertThat(promise.getNow()).isEqualTo(42);
}
@Test
public void cancelOnFutureFromMapMustCancelOriginalFuture() {
DefaultPromise<Integer> promise = new DefaultPromise<>(INSTANCE);
Future<String> strFut = promise.map(i -> i.toString());
strFut.cancel(false);
assertTrue(promise.isCancelled());
assertTrue(strFut.isCancelled());
}
@Test
public void cancelOnOriginalFutureMustCancelFutureFromMap() {
DefaultPromise<Integer> promise = new DefaultPromise<>(INSTANCE);
Future<String> strFut = promise.map(i -> i.toString());
promise.cancel(false);
assertTrue(promise.isCancelled());
assertTrue(strFut.isCancelled());
}
@Test
public void flatMapMustApplyMapperFunctionWhenFutureSucceeds() {
DefaultPromise<Integer> promise = new DefaultPromise<>(INSTANCE);
Future<String> strFut = promise.flatMap(i -> newSuccessfulPromise(INSTANCE, i.toString()));
promise.setSuccess(42);
assertThat(strFut.getNow()).isEqualTo("42");
}
@Test
public void flatMapMustApplyMapperFunctionOnSuccededFuture() {
DefaultPromise<Integer> promise = new DefaultPromise<>(INSTANCE);
promise.setSuccess(42);
assertThat(promise.flatMap(i -> newSuccessfulPromise(INSTANCE, i.toString())).getNow()).isEqualTo("42");
}
@Test
public void flatMapOnFailedFutureMustProduceFailedFuture() {
DefaultPromise<Integer> promise = new DefaultPromise<>(INSTANCE);
Exception cause = new Exception("boom");
promise.setFailure(cause);
assertThat(promise.flatMap(i -> newSuccessfulPromise(INSTANCE, i.toString())).cause()).isSameAs(cause);
}
@Test
public void flatMapOnFailedFutureMustNotApplyMapperFunction() {
DefaultPromise<Integer> promise = new DefaultPromise<>(INSTANCE);
Exception cause = new Exception("boom");
promise.setFailure(cause);
AtomicInteger counter = new AtomicInteger();
assertThat(promise.flatMap(i -> {
counter.getAndIncrement();
return newSuccessfulPromise(INSTANCE, i.toString());
}).cause()).isSameAs(cause);
assertThat(counter.get()).isZero();
}
@Test
public void flatMapMustFailReturnedFutureWhenMapperFunctionThrows() {
DefaultPromise<Integer> promise = new DefaultPromise<>(INSTANCE);
RuntimeException cause = new RuntimeException("boom");
Future<Object> future = promise.flatMap(i -> {
throw cause;
});
promise.setSuccess(42);
assertThat(future.cause()).isSameAs(cause);
}
@Test
public void flatMapMustNotFailOriginalFutureWhenMapperFunctionThrows() {
DefaultPromise<Integer> promise = new DefaultPromise<>(INSTANCE);
promise.flatMap(i -> {
throw new RuntimeException("boom");
});
promise.setSuccess(42);
assertThat(promise.getNow()).isEqualTo(42);
}
@Test
public void cancelOnFutureFromFlatMapMustCancelOriginalFuture() {
DefaultPromise<Integer> promise = new DefaultPromise<>(INSTANCE);
Future<String> strFut = promise.flatMap(i -> newSuccessfulPromise(INSTANCE, i.toString()));
strFut.cancel(false);
assertTrue(promise.isCancelled());
assertTrue(strFut.isCancelled());
}
@Test
public void cancelOnOriginalFutureMustCancelFutureFromFlatMap() {
DefaultPromise<Integer> promise = new DefaultPromise<>(INSTANCE);
Future<String> strFut = promise.flatMap(i -> newSuccessfulPromise(INSTANCE, i.toString()));
promise.cancel(false);
assertTrue(promise.isCancelled());
assertTrue(strFut.isCancelled());
}
@Test
public void cancelOnFutureFromFlatMapMapperMustCancelReturnedFuture() throws Exception {
DefaultPromise<Integer> promise = new DefaultPromise<>(INSTANCE);
Future<String> strFut = promise.flatMap(i -> {
Future<String> future = new DefaultPromise<>(INSTANCE);
future.cancel(false);
return future;
});
promise.setSuccess(42);
assertTrue(strFut.await(5, SECONDS));
assertTrue(strFut.isCancelled());
}
@Test
public void futureFromFlatMapMustNotCompleteUntilMappedFutureCompletes() throws Exception {
TestEventExecutor executor = new TestEventExecutor();
DefaultPromise<Integer> promise = new DefaultPromise<>(executor);
CountDownLatch mappingLatchEnter = new CountDownLatch(1);
CountDownLatch mappingLatchExit = new CountDownLatch(1);
Future<String> strFut = promise.flatMap(i -> {
return executor.submit(() -> {
mappingLatchEnter.countDown();
mappingLatchExit.await();
return i.toString();
});
});
executor.submit(() -> promise.setSuccess(42));
mappingLatchEnter.await();
assertFalse(strFut.await(100));
mappingLatchExit.countDown();
assertThat(strFut.get(5, SECONDS)).isEqualTo("42");
}
}

View File

@ -0,0 +1,24 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import java.util.concurrent.Executors;
final class TestEventExecutor extends SingleThreadEventExecutor {
TestEventExecutor() {
super(Executors.defaultThreadFactory());
}
}

View File

@ -297,7 +297,6 @@ public class BootstrapTest {
Channel channel = bootstrapA.createUnregistered(); Channel channel = bootstrapA.createUnregistered();
Future<Void> registerFuture = channel.register(); Future<Void> registerFuture = channel.register();
Future<Void> connectFuture = channel.connect(LocalAddress.ANY); Future<Void> connectFuture = channel.connect(LocalAddress.ANY);
assertFalse(connectFuture.isDone());
registerHandler.registerPromise().setSuccess(null); registerHandler.registerPromise().setSuccess(null);
registerFuture.sync(); registerFuture.sync();
CompletionException exception = CompletionException exception =