From 81244e1ae10aa425c2916a9f69a0727828f8efa3 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 11 Apr 2019 14:52:33 +0200 Subject: [PATCH] =?UTF-8?q?Introduce=20Future.toStage()=20which=20allows?= =?UTF-8?q?=20to=20obtain=20a=20CompletionStage=20a=E2=80=A6=20(#9004)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Motivation: CompletionStage is the new standard for async operation chaining in JDK8+ that is supported by various of libs. To make it easer to interopt with other libs and to allow users to make good use of lambdas and functional programming style we should allow to convert from our Future to a CompletionStage while still provide the same ordering guarantees. The reason why we expose this as toStage() and not jus have Future extend CompletionStage is for two reasons: - Keep our interface norrow - Keep semantics clear (Future.addListener(...) methods return this while all chaining methods of CompletionStage return a new instance). Modifications: - Merge implements in AbstractFuture to Future (by make these default methods) - Add Future.toStage() as a default method and a special implemention in DefaultPromise (to reduce GC). - Add Future.executor() which returns the EventExecutor that is pinned to the Future - Introduce FutureCompletionStage that extends CompletionStage to clarify threading semantics and guarantees. Result: Easier inter-op with other Java8+ libaries. Related to https://github.com/netty/netty/issues/8523. --- .../netty/util/concurrent/AbstractFuture.java | 58 - .../AbstractScheduledEventExecutor.java | 5 + .../netty/util/concurrent/CompleteFuture.java | 18 +- .../DefaultFutureCompletionStage.java | 563 +++++++++ .../netty/util/concurrent/DefaultPromise.java | 20 +- .../java/io/netty/util/concurrent/Future.java | 45 +- .../concurrent/FutureCompletionStage.java | 213 ++++ .../concurrent/RunnableFutureAdapter.java | 5 + .../RunnableScheduledFutureAdapter.java | 5 + .../DefaultFutureCompletionStageTest.java | 1083 +++++++++++++++++ .../concurrent/FutureCompletionStageTest.java | 68 ++ .../netty/channel/CompleteChannelFuture.java | 2 +- .../DelegatingChannelPromiseNotifier.java | 6 + .../io/netty/channel/VoidChannelPromise.java | 9 +- .../channel/group/DefaultChannelGroup.java | 2 +- .../channel/group/VoidChannelGroupFuture.java | 10 +- 16 files changed, 2043 insertions(+), 69 deletions(-) delete mode 100644 common/src/main/java/io/netty/util/concurrent/AbstractFuture.java create mode 100644 common/src/main/java/io/netty/util/concurrent/DefaultFutureCompletionStage.java create mode 100644 common/src/main/java/io/netty/util/concurrent/FutureCompletionStage.java create mode 100644 common/src/test/java/io/netty/util/concurrent/DefaultFutureCompletionStageTest.java create mode 100644 common/src/test/java/io/netty/util/concurrent/FutureCompletionStageTest.java diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractFuture.java b/common/src/main/java/io/netty/util/concurrent/AbstractFuture.java deleted file mode 100644 index c0a95dedfd..0000000000 --- a/common/src/main/java/io/netty/util/concurrent/AbstractFuture.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright 2013 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.util.concurrent; - -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * Abstract {@link Future} implementation which does not allow for cancellation. - * - * @param - */ -public abstract class AbstractFuture implements Future { - - @Override - public V get() throws InterruptedException, ExecutionException { - await(); - - Throwable cause = cause(); - if (cause == null) { - return getNow(); - } - if (cause instanceof CancellationException) { - throw (CancellationException) cause; - } - throw new ExecutionException(cause); - } - - @Override - public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - if (await(timeout, unit)) { - Throwable cause = cause(); - if (cause == null) { - return getNow(); - } - if (cause instanceof CancellationException) { - throw (CancellationException) cause; - } - throw new ExecutionException(cause); - } - throw new TimeoutException(); - } -} diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java index 49b2fcf7f5..184dd02f86 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java @@ -283,6 +283,11 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut this.future = future; } + @Override + public EventExecutor executor() { + return future.executor(); + } + @Override public long deadlineNanos() { return future.deadlineNanos(); diff --git a/common/src/main/java/io/netty/util/concurrent/CompleteFuture.java b/common/src/main/java/io/netty/util/concurrent/CompleteFuture.java index 2bf4b7869d..8099d42c85 100644 --- a/common/src/main/java/io/netty/util/concurrent/CompleteFuture.java +++ b/common/src/main/java/io/netty/util/concurrent/CompleteFuture.java @@ -23,10 +23,14 @@ import java.util.concurrent.TimeUnit; /** * A skeletal {@link Future} implementation which represents a {@link Future} which has been completed already. */ -public abstract class CompleteFuture extends AbstractFuture { +public abstract class CompleteFuture implements Future { private final EventExecutor executor; + // It is fine to not make this volatile as even if we override the value in there it does not matter as + // DefaultFutureCompletionStage has no state itself and is just a wrapper around this CompletableFuture instance. + private DefaultFutureCompletionStage stage; + /** * Creates a new instance. * @@ -39,7 +43,8 @@ public abstract class CompleteFuture extends AbstractFuture { /** * Return the {@link EventExecutor} which is used by this {@link CompleteFuture}. */ - protected EventExecutor executor() { + @Override + public EventExecutor executor() { return executor; } @@ -147,4 +152,13 @@ public abstract class CompleteFuture extends AbstractFuture { public boolean cancel(boolean mayInterruptIfRunning) { return false; } + + @Override + public FutureCompletionStage asStage() { + DefaultFutureCompletionStage stageAdapter = stage; + if (stageAdapter == null) { + stage = stageAdapter = new DefaultFutureCompletionStage<>(this); + } + return stageAdapter; + } } diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultFutureCompletionStage.java b/common/src/main/java/io/netty/util/concurrent/DefaultFutureCompletionStage.java new file mode 100644 index 0000000000..c024cd9d2a --- /dev/null +++ b/common/src/main/java/io/netty/util/concurrent/DefaultFutureCompletionStage.java @@ -0,0 +1,563 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.concurrent; + + +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +import static java.util.Objects.requireNonNull; + +/** + * Wraps a {@link io.netty.util.concurrent.Future} and provides a {@link FutureCompletionStage} implementation + * on top of it. + * + * @param the value type. + */ +final class DefaultFutureCompletionStage implements FutureCompletionStage { + private enum Marker { + EMPTY, + ERROR + } + + // Just a marker + private static final Executor SAME_AS_FUTURE = task -> { + throw new UnsupportedOperationException("Only a marker, should never been called!"); + }; + + private final Future future; + + DefaultFutureCompletionStage(Future future) { + this.future = future; + } + + @Override + public Future future() { + return future; + } + + @Override + public FutureCompletionStage thenApply(Function fn) { + return thenApplyAsync(fn, SAME_AS_FUTURE); + } + + @Override + public FutureCompletionStage thenApplyAsync(Function fn) { + return thenApplyAsync(fn, ForkJoinPool.commonPool()); + } + + @Override + public FutureCompletionStage thenAccept(Consumer action) { + return thenAcceptAsync(action, SAME_AS_FUTURE); + } + + @Override + public FutureCompletionStage thenAcceptAsync(Consumer action) { + return thenAcceptAsync(action, ForkJoinPool.commonPool()); + } + + @Override + public FutureCompletionStage thenRun(Runnable action) { + return thenRunAsync(action, SAME_AS_FUTURE); + } + + @Override + public FutureCompletionStage thenRunAsync(Runnable action) { + return thenRunAsync(action, ForkJoinPool.commonPool()); + } + + @Override + public FutureCompletionStage thenCombine( + CompletionStage other, BiFunction fn) { + return thenCombineAsync(other, fn, SAME_AS_FUTURE); + } + + @Override + public FutureCompletionStage thenCombineAsync( + CompletionStage other, BiFunction fn) { + return thenCombineAsync(other, fn, ForkJoinPool.commonPool()); + } + + @Override + public FutureCompletionStage thenAcceptBoth( + CompletionStage other, BiConsumer action) { + return thenAcceptBothAsync(other, action, SAME_AS_FUTURE); + } + + @Override + public FutureCompletionStage thenAcceptBothAsync( + CompletionStage other, BiConsumer action) { + return thenAcceptBothAsync(other, action, ForkJoinPool.commonPool()); + } + + @Override + public FutureCompletionStage runAfterBoth(CompletionStage other, Runnable action) { + return runAfterBothAsync(other, action, SAME_AS_FUTURE); + } + + @Override + public FutureCompletionStage runAfterBothAsync(CompletionStage other, Runnable action) { + return runAfterBothAsync(other, action, ForkJoinPool.commonPool()); + } + + @Override + public FutureCompletionStage applyToEither( + CompletionStage other, Function fn) { + return applyToEitherAsync(other, fn, SAME_AS_FUTURE); + } + + @Override + public FutureCompletionStage applyToEitherAsync( + CompletionStage other, Function fn) { + return applyToEitherAsync(other, fn, ForkJoinPool.commonPool()); + } + + @Override + public FutureCompletionStage acceptEither(CompletionStage other, Consumer action) { + return acceptEitherAsync(other, action, SAME_AS_FUTURE); + } + + @Override + public FutureCompletionStage acceptEitherAsync( + CompletionStage other, Consumer action) { + return acceptEitherAsync(other, action, ForkJoinPool.commonPool()); + } + + @Override + public FutureCompletionStage runAfterEither(CompletionStage other, Runnable action) { + return runAfterEitherAsync(other, action, SAME_AS_FUTURE); + } + + @Override + public FutureCompletionStage runAfterEitherAsync(CompletionStage other, Runnable action) { + return runAfterEitherAsync(other, action, ForkJoinPool.commonPool()); + } + + @Override + public FutureCompletionStage thenCompose(Function> fn) { + return thenComposeAsync(fn, SAME_AS_FUTURE); + } + + @Override + public FutureCompletionStage thenComposeAsync(Function> fn) { + return thenComposeAsync(fn, ForkJoinPool.commonPool()); + } + + @Override + public FutureCompletionStage whenComplete(BiConsumer action) { + return whenCompleteAsync(action, SAME_AS_FUTURE); + } + + @Override + public FutureCompletionStage whenCompleteAsync(BiConsumer action) { + return whenCompleteAsync(action, ForkJoinPool.commonPool()); + } + + @Override + public FutureCompletionStage handle(BiFunction fn) { + return handleAsync(fn, SAME_AS_FUTURE); + } + + @Override + public FutureCompletionStage handleAsync(BiFunction fn) { + return handleAsync(fn, ForkJoinPool.commonPool()); + } + + @Override + public FutureCompletionStage thenApplyAsync(Function fn, Executor executor) { + requireNonNull(fn, "fn"); + requireNonNull(executor, "executor"); + + Promise promise = executor().newPromise(); + future.addListener(future -> { + Throwable cause = future.cause(); + if (cause == null) { + @SuppressWarnings("unchecked") V value = (V) future.getNow(); + if (executeDirectly(executor)) { + thenApplyAsync0(promise, value, fn); + } else { + safeExecute(executor, () -> thenApplyAsync0(promise, value, fn), promise); + } + } else { + promise.setFailure(cause); + } + }); + return promise.asStage(); + } + + private static void thenApplyAsync0(Promise promise, V value, Function fn) { + final U result; + try { + result = fn.apply(value); + } catch (Throwable cause) { + promise.setFailure(cause); + return; + } + promise.setSuccess(result); + } + + @Override + public FutureCompletionStage thenAcceptAsync(Consumer action, Executor executor) { + requireNonNull(action, "action"); + requireNonNull(executor, "executor"); + + Promise promise = executor().newPromise(); + future.addListener(future -> { + Throwable cause = future.cause(); + if (cause == null) { + @SuppressWarnings("unchecked") V value = (V) future.getNow(); + if (executeDirectly(executor)) { + thenAcceptAsync0(promise, value, action); + } else { + safeExecute(executor, () -> thenAcceptAsync0(promise, value, action), promise); + } + } else { + promise.setFailure(cause); + } + }); + return promise.asStage(); + } + + private static void thenAcceptAsync0(Promise promise, V value, Consumer action) { + try { + action.accept(value); + promise.setSuccess(null); + } catch (Throwable cause) { + promise.setFailure(cause); + } + } + + @Override + public FutureCompletionStage thenRunAsync(Runnable action, Executor executor) { + return thenAcceptAsync(ignore -> action.run(), executor); + } + + @Override + public FutureCompletionStage thenCombineAsync( + CompletionStage other, BiFunction fn, Executor executor) { + requireNonNull(other, "other"); + requireNonNull(fn, "fn"); + requireNonNull(executor, "executor"); + + Promise promise = executor().newPromise(); + AtomicReference reference = new AtomicReference<>(Marker.EMPTY); + + abstract class CombineBiConsumer implements BiConsumer { + @SuppressWarnings("unchecked") + @Override + public void accept(T v, Throwable error) { + if (error == null) { + if (!reference.compareAndSet(Marker.EMPTY, v)) { + Object rawValue = reference.get(); + if (rawValue == Marker.ERROR) { + return; + } + applyAndNotify0(promise, (T1) v, (T2) rawValue, fn); + } + } else { + if (reference.getAndSet(Marker.ERROR) != Marker.ERROR) { + // Did not fail the promise before, do it now. + promise.setFailure(error); + } + } + } + + abstract void applyAndNotify0( + Promise promise, T1 value1, T2 value2, BiFunction fn); + } + + whenCompleteAsync(new CombineBiConsumer() { + @Override + void applyAndNotify0( + Promise promise, V value1, U value2, BiFunction fn) { + applyAndNotify(promise, value1, value2, fn); + } + }, executor); + other.whenCompleteAsync(new CombineBiConsumer() { + @Override + void applyAndNotify0( + Promise promise, U value1, V value2, BiFunction fn) { + applyAndNotify(promise, value2, value1, fn); + } + }, otherExecutor(executor)); + return promise.asStage(); + } + + private Executor otherExecutor(Executor executor) { + return executor == SAME_AS_FUTURE ? executor() : executor; + } + + @Override + public FutureCompletionStage thenAcceptBothAsync( + CompletionStage other, BiConsumer action, Executor executor) { + requireNonNull(action, "action"); + return thenCombineAsync(other, (value, value2) -> { + action.accept(value, value2); + return null; + }, executor); + } + + @Override + public FutureCompletionStage runAfterBothAsync(CompletionStage other, Runnable action, Executor executor) { + requireNonNull(action, "action"); + return thenCombineAsync(other, (ignoreOtherValue, ignoreError) -> { + action.run(); + return null; + }, executor); + } + + @Override + public FutureCompletionStage applyToEitherAsync( + CompletionStage other, Function fn, Executor executor) { + requireNonNull(other, "other"); + requireNonNull(fn, "fn"); + + Promise promise = executor().newPromise(); + BiConsumer consumer = new AtomicBiConsumer(promise) { + @Override + protected U apply(V value) { + return fn.apply(value); + } + }; + whenCompleteAsync(consumer, executor); + other.whenCompleteAsync(consumer, otherExecutor(executor)); + return promise.asStage(); + } + + @Override + public FutureCompletionStage acceptEitherAsync( + CompletionStage other, Consumer action, Executor executor) { + requireNonNull(other, "other"); + requireNonNull(action, "action"); + + Promise promise = executor().newPromise(); + BiConsumer consumer = new AtomicBiConsumer(promise) { + @Override + protected Void apply(V value) { + action.accept(value); + return null; + } + }; + whenCompleteAsync(consumer, executor); + other.whenCompleteAsync(consumer, otherExecutor(executor)); + return promise.asStage(); + } + + @Override + public FutureCompletionStage runAfterEitherAsync( + CompletionStage other, Runnable action, Executor executor) { + requireNonNull(other, "other"); + requireNonNull(action, "action"); + + Promise promise = executor().newPromise(); + BiConsumer consumer = new AtomicBiConsumer(promise) { + @Override + protected Void apply(Object value) { + action.run(); + return null; + } + }; + whenCompleteAsync(consumer, executor); + other.whenCompleteAsync(consumer, otherExecutor(executor)); + return promise.asStage(); + } + + @Override + public FutureCompletionStage thenComposeAsync( + Function> fn, Executor executor) { + requireNonNull(fn, "fn"); + requireNonNull(executor, "executor"); + + Promise promise = executor().newPromise(); + future.addListener(f -> { + Throwable cause = f.cause(); + if (cause == null) { + @SuppressWarnings("unchecked") V value = (V) f.getNow(); + if (executeDirectly(executor)) { + thenComposeAsync0(promise, fn, value); + } else { + safeExecute(executor, () -> thenComposeAsync0(promise, fn, value), promise); + } + } else { + promise.setFailure(cause); + } + }); + return promise.asStage(); + } + + private static void thenComposeAsync0( + Promise promise, Function> fn, V value) { + final CompletionStage result; + try { + result = fn.apply(value); + } catch (Throwable cause) { + promise.setFailure(cause); + return; + } + result.whenComplete((v, error) -> { + if (error == null) { + promise.setSuccess(v); + } else { + promise.setFailure(error); + } + }); + } + + @Override + public FutureCompletionStage exceptionally(Function fn) { + requireNonNull(fn, "fn"); + + Promise promise = executor().newPromise(); + future.addListener(f -> { + Throwable error = f.cause(); + if (error == null) { + @SuppressWarnings("unchecked") V value = (V) f.getNow(); + promise.setSuccess(value); + } else { + final V result; + try { + result = fn.apply(error); + } catch (Throwable cause) { + promise.setFailure(cause); + return; + } + promise.setSuccess(result); + } + }); + return promise.asStage(); + } + + @Override + public FutureCompletionStage whenCompleteAsync( + BiConsumer action, Executor executor) { + requireNonNull(action, "action"); + requireNonNull(executor, "executor"); + + Promise promise = executor().newPromise(); + future.addListener(f -> { + if (executeDirectly(executor)) { + whenCompleteAsync0(promise, f, action); + } else { + safeExecute(executor, () -> whenCompleteAsync0(promise, f, action), promise); + } + }); + return promise.asStage(); + } + + private static void whenCompleteAsync0( + Promise promise, Future f, BiConsumer action) { + Throwable cause = f.cause(); + + @SuppressWarnings("unchecked") V value = cause == null ? (V) f.getNow() : null; + try { + action.accept(value, cause); + } catch (Throwable error) { + promise.setFailure(error); + return; + } + + if (cause == null) { + promise.setSuccess(value); + } else { + promise.setFailure(cause); + } + } + + @Override + public FutureCompletionStage handleAsync( + BiFunction fn, Executor executor) { + requireNonNull(fn, "fn"); + requireNonNull(executor, "executor"); + + Promise promise = executor().newPromise(); + future.addListener(f -> { + if (executeDirectly(executor)) { + handleAsync0(promise, f, fn); + } else { + safeExecute(executor, () -> handleAsync0(promise, f, fn), promise); + } + }); + return promise.asStage(); + } + + @SuppressWarnings("unchecked") + private static void handleAsync0( + Promise promise, Future f, BiFunction fn) { + Throwable cause = f.cause(); + applyAndNotify(promise, cause == null ? (V) f.getNow() : null, cause, fn); + } + + private static void applyAndNotify( + Promise promise, V value, T value2, BiFunction fn) { + final U result; + try { + result = fn.apply(value, value2); + } catch (Throwable error) { + promise.setFailure(error); + return; + } + promise.setSuccess(result); + } + + private static boolean executeDirectly(Executor executor) { + return executor == SAME_AS_FUTURE; + } + + private static void safeExecute(Executor executor, Runnable task, Promise promise) { + try { + executor.execute(task); + } catch (Throwable cause) { + promise.setFailure(cause); + } + } + + private abstract static class AtomicBiConsumer extends AtomicReference + implements BiConsumer { + + private final Promise promise; + + AtomicBiConsumer(Promise promise) { + super(Marker.EMPTY); + this.promise = promise; + } + + @Override + public void accept(V v, Throwable error) { + if (error == null) { + if (compareAndSet(Marker.EMPTY, v)) { + final U value; + try { + value = apply(v); + } catch (Throwable cause) { + promise.setFailure(cause); + return; + } + promise.setSuccess(value); + } + } else if (compareAndSet(Marker.EMPTY, Marker.ERROR)) { + promise.setFailure(error); + } + } + + protected abstract U apply(V value); + } +} diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java index e66582a4ab..2f6547d8d3 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java @@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; -public class DefaultPromise extends AbstractFuture implements Promise { +public class DefaultPromise implements Promise { private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class); private static final InternalLogger rejectedExecutionLogger = InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution"); @@ -47,6 +47,11 @@ public class DefaultPromise extends AbstractFuture implements Promise { private volatile Object result; private final EventExecutor executor; + + // It is fine to not make this volatile as even if we override the value in there it does not matter as + // DefaultFutureCompletionStage has no state itself and is just a wrapper around this DefaultPromise instance. + private DefaultFutureCompletionStage stage; + /** * One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}. * If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified. @@ -73,6 +78,7 @@ public class DefaultPromise extends AbstractFuture implements Promise { */ public DefaultPromise(EventExecutor executor) { this.executor = requireNonNull(executor, "executor"); + stage = new DefaultFutureCompletionStage<>(this); } @Override @@ -363,7 +369,8 @@ public class DefaultPromise extends AbstractFuture implements Promise { * depth exceeds a threshold. * @return The executor used to notify listeners when this promise is complete. */ - protected final EventExecutor executor() { + @Override + public final EventExecutor executor() { return executor; } @@ -736,4 +743,13 @@ public class DefaultPromise extends AbstractFuture implements Promise { rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t); } } + + @Override + public FutureCompletionStage asStage() { + DefaultFutureCompletionStage stageAdapter = stage; + if (stageAdapter == null) { + stage = stageAdapter = new DefaultFutureCompletionStage<>(this); + } + return stageAdapter; + } } diff --git a/common/src/main/java/io/netty/util/concurrent/Future.java b/common/src/main/java/io/netty/util/concurrent/Future.java index 5c49023a78..907c182596 100644 --- a/common/src/main/java/io/netty/util/concurrent/Future.java +++ b/common/src/main/java/io/netty/util/concurrent/Future.java @@ -16,8 +16,9 @@ package io.netty.util.concurrent; import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; - +import java.util.concurrent.TimeoutException; /** * The result of an asynchronous operation. @@ -174,4 +175,46 @@ public interface Future extends java.util.concurrent.Future { */ @Override boolean cancel(boolean mayInterruptIfRunning); + + /** + * Returns the {@link EventExecutor} that is tied to this {@link Future}. + */ + EventExecutor executor(); + + @Override + default V get() throws InterruptedException, ExecutionException { + await(); + + Throwable cause = cause(); + if (cause == null) { + return getNow(); + } + if (cause instanceof CancellationException) { + throw (CancellationException) cause; + } + throw new ExecutionException(cause); + } + + @Override + default V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + if (await(timeout, unit)) { + Throwable cause = cause(); + if (cause == null) { + return getNow(); + } + if (cause instanceof CancellationException) { + throw (CancellationException) cause; + } + throw new ExecutionException(cause); + } + throw new TimeoutException(); + } + + /** + * Returns a {@link FutureCompletionStage} that reflects the state of this {@link Future} and so will receive + * all updates as well. + */ + default FutureCompletionStage asStage() { + return new DefaultFutureCompletionStage<>(this); + } } diff --git a/common/src/main/java/io/netty/util/concurrent/FutureCompletionStage.java b/common/src/main/java/io/netty/util/concurrent/FutureCompletionStage.java new file mode 100644 index 0000000000..bcf1e26328 --- /dev/null +++ b/common/src/main/java/io/netty/util/concurrent/FutureCompletionStage.java @@ -0,0 +1,213 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.concurrent; + +import io.netty.util.internal.StringUtil; + +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * A {@link CompletionStage} that provides the same threading semantics and guarantees as the underlying + * {@link io.netty.util.concurrent.Future}, which means that all the callbacks will be executed by {@link #executor()} + * if not specified otherwise (by calling the corresponding *Async methods). + * + * Please be aware that {@link FutureCompletionStage#toCompletableFuture()} is not supported and so will throw + * a {@link UnsupportedOperationException} when invoked. + * + * @param the value type. + */ +public interface FutureCompletionStage extends CompletionStage { + + /** + * Returns the underlying {@link Future} of this {@link FutureCompletionStage}. + */ + Future future(); + + /** + * See {@link Future#executor()}. + */ + default EventExecutor executor() { + return future().executor(); + } + + /** + * Not supported and so throws an {@link UnsupportedOperationException}. + */ + @Override + default CompletableFuture toCompletableFuture() { + throw new UnsupportedOperationException("Not supported by " + + StringUtil.simpleClassName(FutureCompletionStage.class)); + } + + @Override + FutureCompletionStage thenApply(Function fn); + + @Override + FutureCompletionStage thenApplyAsync(Function fn); + + @Override + FutureCompletionStage thenAccept(Consumer action); + + @Override + FutureCompletionStage thenAcceptAsync(Consumer action); + + @Override + FutureCompletionStage thenRun(Runnable action); + + @Override + FutureCompletionStage thenRunAsync(Runnable action); + + @Override + FutureCompletionStage thenCombine( + CompletionStage other, BiFunction fn); + + @Override + FutureCompletionStage thenCombineAsync( + CompletionStage other, BiFunction fn); + + @Override + FutureCompletionStage thenAcceptBoth( + CompletionStage other, BiConsumer action); + + @Override + FutureCompletionStage thenAcceptBothAsync( + CompletionStage other, BiConsumer action); + + @Override + FutureCompletionStage runAfterBoth(CompletionStage other, Runnable action); + + @Override + FutureCompletionStage runAfterBothAsync(CompletionStage other, Runnable action); + + @Override + FutureCompletionStage applyToEither(CompletionStage other, Function fn); + + @Override + FutureCompletionStage applyToEitherAsync(CompletionStage other, Function fn); + + @Override + FutureCompletionStage acceptEither(CompletionStage other, Consumer action); + + @Override + FutureCompletionStage acceptEitherAsync(CompletionStage other, Consumer action); + + @Override + FutureCompletionStage runAfterEither(CompletionStage other, Runnable action); + + @Override + FutureCompletionStage runAfterEitherAsync(CompletionStage other, Runnable action); + + @Override + FutureCompletionStage thenCompose(Function> fn); + + @Override + FutureCompletionStage thenComposeAsync(Function> fn); + + @Override + FutureCompletionStage whenComplete(BiConsumer action); + + @Override + FutureCompletionStage whenCompleteAsync(BiConsumer action); + + @Override + FutureCompletionStage handle(BiFunction fn); + + @Override + FutureCompletionStage handleAsync(BiFunction fn); + + @Override + FutureCompletionStage thenApplyAsync(Function fn, Executor executor); + + @Override + FutureCompletionStage thenAcceptAsync(Consumer action, Executor executor); + + @Override + FutureCompletionStage thenRunAsync(Runnable action, Executor executor); + + @Override + FutureCompletionStage thenCombineAsync( + CompletionStage other, BiFunction fn, Executor executor); + + @Override + FutureCompletionStage thenAcceptBothAsync( + CompletionStage other, BiConsumer action, Executor executor); + + @Override + FutureCompletionStage runAfterBothAsync( + CompletionStage other, Runnable action, Executor executor); + + @Override + FutureCompletionStage applyToEitherAsync( + CompletionStage other, Function fn, Executor executor); + + @Override + FutureCompletionStage acceptEitherAsync( + CompletionStage other, Consumer action, Executor executor); + + @Override + FutureCompletionStage runAfterEitherAsync( + CompletionStage other, Runnable action, Executor executor); + + @Override + FutureCompletionStage thenComposeAsync( + Function> fn, Executor executor); + + @Override + FutureCompletionStage exceptionally(Function fn); + + @Override + FutureCompletionStage whenCompleteAsync(BiConsumer action, Executor executor); + + @Override + FutureCompletionStage handleAsync(BiFunction fn, Executor executor); + + /** + * Returns a {@link FutureCompletionStage} for the given {@link CompletionStage} + * that is pinned to the given {@link EventExecutor}. + */ + static FutureCompletionStage toFutureCompletionStage(CompletionStage stage, EventExecutor executor) { + Objects.requireNonNull(stage, "stage"); + Objects.requireNonNull(executor, "executor"); + if (stage instanceof FutureCompletionStage && ((FutureCompletionStage) stage).executor() == executor) { + return (FutureCompletionStage) stage; + } + + // Try fast-path for CompletableFuture instances that are already complete to reduce object creation. + if (stage instanceof CompletableFuture) { + CompletableFuture future = (CompletableFuture) stage; + if (future.isDone() && !future.isCompletedExceptionally()) { + return executor.newSucceededFuture(future.getNow(null)).asStage(); + } + } + + Promise promise = executor.newPromise(); + stage.whenComplete((v, cause) -> { + if (cause != null) { + promise.setFailure(cause); + } else { + promise.setSuccess(v); + } + }); + return promise.asStage(); + } +} diff --git a/common/src/main/java/io/netty/util/concurrent/RunnableFutureAdapter.java b/common/src/main/java/io/netty/util/concurrent/RunnableFutureAdapter.java index 8583f6ee3b..8b38392d26 100644 --- a/common/src/main/java/io/netty/util/concurrent/RunnableFutureAdapter.java +++ b/common/src/main/java/io/netty/util/concurrent/RunnableFutureAdapter.java @@ -34,6 +34,11 @@ final class RunnableFutureAdapter implements RunnableFuture { this.task = requireNonNull(task, "task"); } + @Override + public EventExecutor executor() { + return promise.executor(); + } + @Override public boolean isSuccess() { return promise.isSuccess(); diff --git a/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFutureAdapter.java b/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFutureAdapter.java index 1f81195910..eb8edce265 100644 --- a/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFutureAdapter.java +++ b/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFutureAdapter.java @@ -53,6 +53,11 @@ final class RunnableScheduledFutureAdapter implements RunnableScheduledFuture this.periodNanos = periodNanos; } + @Override + public EventExecutor executor() { + return executor; + } + @Override public long deadlineNanos() { return deadlineNanos; diff --git a/common/src/test/java/io/netty/util/concurrent/DefaultFutureCompletionStageTest.java b/common/src/test/java/io/netty/util/concurrent/DefaultFutureCompletionStageTest.java new file mode 100644 index 0000000000..dad5c5f8a4 --- /dev/null +++ b/common/src/test/java/io/netty/util/concurrent/DefaultFutureCompletionStageTest.java @@ -0,0 +1,1083 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.concurrent; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executors; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class DefaultFutureCompletionStageTest { + + private static EventExecutorGroup group; + private static EventExecutorGroup asyncExecutorGroup; + private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException(); + private static final Integer EXPECTED_INTEGER = 1; + private static final Boolean INITIAL_BOOLEAN = Boolean.TRUE; + private static final Boolean EXPECTED_BOOLEAN = Boolean.FALSE; + + @BeforeClass + public static void setup() { + group = new MultithreadEventExecutorGroup(1, Executors.defaultThreadFactory()); + asyncExecutorGroup = new MultithreadEventExecutorGroup(1, Executors.defaultThreadFactory()); + } + + @AfterClass + public static void destroy() { + group.shutdownGracefully(); + asyncExecutorGroup.shutdownGracefully(); + } + + private static EventExecutor executor() { + return group.next(); + } + + private static EventExecutor asyncExecutor() { + return asyncExecutorGroup.next(); + } + + private static Future newSucceededFuture() { + return executor().newSucceededFuture(INITIAL_BOOLEAN); + } + + private static Future newFailedFuture() { + return executor().newFailedFuture(EXPECTED_EXCEPTION); + } + + @Test + public void testSameExecutorAndFuture() { + EventExecutor executor = executor(); + Promise promise = executor.newPromise(); + FutureCompletionStage stage = new DefaultFutureCompletionStage<>(promise); + assertSame(executor, stage.executor()); + assertSame(promise, stage.future()); + } + + @Test(expected = UnsupportedOperationException.class) + public void testThrowsUnsupportedOperationException() { + EventExecutor executor = executor(); + Promise promise = executor.newPromise(); + FutureCompletionStage stage = new DefaultFutureCompletionStage<>(promise); + stage.toCompletableFuture(); + } + + @Test + public void testThenApply() { + testThenApply0(stage -> stage.thenApply(v -> { + assertSame(INITIAL_BOOLEAN, v); + assertTrue(stage.executor().inEventLoop()); + + return EXPECTED_BOOLEAN; + }), false); + } + + @Test + public void testThenApplyAsync() { + testThenApply0(stage -> stage.thenApplyAsync(v -> { + assertSame(INITIAL_BOOLEAN, v); + assertFalse(stage.executor().inEventLoop()); + + return EXPECTED_BOOLEAN; + }), false); + } + + @Test + public void testThenApplyAsyncWithExecutor() { + EventExecutor asyncExecutor = asyncExecutor(); + testThenApply0(stage -> stage.thenApplyAsync(v -> { + assertSame(INITIAL_BOOLEAN, v); + assertFalse(stage.executor().inEventLoop()); + + return EXPECTED_BOOLEAN; + }, asyncExecutor), false); + } + + @Test + public void testThenApplyCallbackNotExecuted() { + testHandle0(newFailedFuture(), stage -> stage.thenApply(v -> { + fail(); + return null; + }), true); + } + + @Test + public void testThenApplyAsyncCallbackNotExecuted() { + testHandle0(newFailedFuture(), stage -> stage.thenApplyAsync(v -> { + fail(); + return null; + }), true); + } + + @Test + public void testThenApplyAsyncWithExecutorCallbackNotExecuted() { + EventExecutor asyncExecutor = asyncExecutor(); + testHandle0(newFailedFuture(), stage -> stage.thenApplyAsync(v -> { + fail(); + return null; + }, asyncExecutor), true); + } + + @Test + public void testThenApplyFunctionThrows() { + testThenApply0(stage -> stage.thenApply(v -> { + throw EXPECTED_EXCEPTION; + }), true); + } + + @Test + public void testThenApplyAsyncFunctionThrows() { + testThenApply0(stage -> stage.thenApplyAsync(v -> { + throw EXPECTED_EXCEPTION; + }), true); + } + + @Test + public void testThenApplyAsyncWithExecutorFunctionThrows() { + EventExecutor asyncExecutor = asyncExecutor(); + testThenApply0(stage -> stage.thenApplyAsync(v -> { + throw EXPECTED_EXCEPTION; + }, asyncExecutor), true); + } + + private void testHandle0(Future future, + Function, FutureCompletionStage> fn, + boolean exception) { + FutureCompletionStage stage = new DefaultFutureCompletionStage<>(future); + + Future f = fn.apply(stage).future().awaitUninterruptibly(); + if (exception) { + assertSame(EXPECTED_EXCEPTION, f.cause()); + } else { + assertSame(EXPECTED_BOOLEAN, f.syncUninterruptibly().getNow()); + } + } + + private void testWhenComplete0(Future future, + Function, FutureCompletionStage> fn, + boolean exception) { + testHandle0(future, stage -> fn.apply(stage).thenApply(v -> { + assertSame(INITIAL_BOOLEAN, v); + return EXPECTED_BOOLEAN; + }), exception); + } + + private void testThenApply0( + Function, FutureCompletionStage> fn, boolean exception) { + testHandle0(newSucceededFuture(), fn, exception); + } + + @Test + public void testThenAccept() { + testThenAccept0(stage -> stage.thenAccept(v -> { + assertSame(INITIAL_BOOLEAN, v); + assertTrue(stage.executor().inEventLoop()); + }), false); + } + + @Test + public void testThenAcceptAsync() { + testThenAccept0(stage -> stage.thenAcceptAsync(v -> { + assertSame(INITIAL_BOOLEAN, v); + + assertFalse(stage.executor().inEventLoop()); + }), false); + } + + @Test + public void testThenAcceptAsyncWithExecutor() { + EventExecutor asyncExecutor = asyncExecutor(); + testThenAccept0(stage -> stage.thenAcceptAsync(v -> { + assertSame(INITIAL_BOOLEAN, v); + + assertFalse(stage.executor().inEventLoop()); + assertTrue(asyncExecutor.inEventLoop()); + }, asyncExecutor), false); + } + + @Test + public void testThenAcceptCallbackNotExecuted() { + testThenAccept0(newFailedFuture(), stage -> stage.thenAccept(v -> { + fail(); + }), true); + } + + @Test + public void testThenAcceptAsyncCallbackNotExecuted() { + testThenAccept0(newFailedFuture(), stage -> stage.thenAcceptAsync(v -> { + fail(); + }), true); + } + + @Test + public void testThenAcceptAsyncWithExecutorCallbackNotExecuted() { + EventExecutor asyncExecutor = asyncExecutor(); + testThenAccept0(newFailedFuture(), stage -> stage.thenAcceptAsync(v -> { + fail(); + }, asyncExecutor), true); + } + + @Test + public void testThenAcceptConsumerThrows() { + testThenAccept0(stage -> stage.thenAccept(v -> { + throw EXPECTED_EXCEPTION; + }), true); + } + + @Test + public void testThenAcceptAsyncConsumerThrows() { + testThenAccept0(stage -> stage.thenAcceptAsync(v -> { + throw EXPECTED_EXCEPTION; + }), true); + } + + @Test + public void testThenAcceptAsyncWithExecutorConsumerThrows() { + EventExecutor asyncExecutor = asyncExecutor(); + testThenAccept0(stage -> stage.thenAcceptAsync(v -> { + throw EXPECTED_EXCEPTION; + }, asyncExecutor), true); + } + + private void testThenAccept0( + Function, FutureCompletionStage> fn, boolean exception) { + testThenAccept0(newSucceededFuture(), fn, exception); + } + + private void testThenAccept0(Future future, + Function, FutureCompletionStage> fn, boolean exception) { + FutureCompletionStage stage = new DefaultFutureCompletionStage<>(future); + Future f = fn.apply(stage).future().awaitUninterruptibly(); + if (exception) { + assertSame(EXPECTED_EXCEPTION, f.cause()); + } else { + assertNull(f.syncUninterruptibly().getNow()); + } + } + + @Test + public void testThenRun() { + testThenAccept0(stage -> stage.thenRun(() -> { + assertTrue(stage.executor().inEventLoop()); + }), false); + } + + @Test + public void testThenRunAsync() { + testThenAccept0(stage -> stage.thenRunAsync(() -> { + assertFalse(stage.executor().inEventLoop()); + }), false); + } + + @Test + public void testThenRunAsyncWithExecutor() { + EventExecutor asyncExecutor = asyncExecutor(); + testThenAccept0(stage -> stage.thenRunAsync(() -> { + assertFalse(stage.executor().inEventLoop()); + assertTrue(asyncExecutor.inEventLoop()); + }, asyncExecutor), false); + } + + @Test + public void testThenRunCallbackNotExecuted() { + testThenAccept0(newFailedFuture(), stage -> stage.thenRun(() -> { + fail(); + }), true); + } + + @Test + public void testThenRunAsyncCallbackNotExecuted() { + testThenAccept0(newFailedFuture(), stage -> stage.thenRunAsync(() -> { + fail(); + }), true); + } + + @Test + public void testThenRunAsyncWithExecutorCallbackNotExecuted() { + EventExecutor asyncExecutor = asyncExecutor(); + testThenAccept0(newFailedFuture(), stage -> stage.thenRunAsync(() -> { + fail(); + }, asyncExecutor), true); + } + + @Test + public void testThenRunTaskThrows() { + testThenAccept0(stage -> stage.thenRun(() -> { + throw EXPECTED_EXCEPTION; + }), true); + } + + @Test + public void testThenRunAsyncTaskThrows() { + testThenAccept0(stage -> stage.thenRunAsync(() -> { + throw EXPECTED_EXCEPTION; + }), true); + } + + @Test + public void testThenRunAsyncWithExecutorTaskThrows() { + EventExecutor asyncExecutor = asyncExecutor(); + testThenAccept0(stage -> stage.thenRunAsync(() -> { + throw EXPECTED_EXCEPTION; + }, asyncExecutor), true); + } + + @Test + public void testThenCombine() { + testCombination0((stage, other) -> stage.thenCombine(other, (v1, v2) -> { + assertSame(v1, INITIAL_BOOLEAN); + assertSame(v2, INITIAL_BOOLEAN); + assertTrue(stage.executor().inEventLoop()); + + return 1; + }), true, CombinationTestMode.COMPLETE); + } + + @Test + public void testThenCombineAsync() { + testCombination0((stage, other) -> stage.thenCombineAsync(other, (v1, v2) -> { + assertSame(v1, INITIAL_BOOLEAN); + assertSame(v2, INITIAL_BOOLEAN); + assertFalse(stage.executor().inEventLoop()); + + return 1; + }), true, CombinationTestMode.COMPLETE); + } + + @Test + public void testThenCombineAsyncWithExecutor() { + EventExecutor asyncExecutor = asyncExecutor(); + + testCombination0((stage, other) -> stage.thenCombineAsync(other, (v1, v2) -> { + assertSame(v1, INITIAL_BOOLEAN); + assertSame(v2, INITIAL_BOOLEAN); + assertFalse(stage.executor().inEventLoop()); + assertTrue(asyncExecutor.inEventLoop()); + return 1; + }, asyncExecutor), true, CombinationTestMode.COMPLETE); + } + + @Test + public void testThenCombineThrowable() { + testCombination0((stage, other) -> stage.thenCombine(other, (v1, v2) -> { + fail(); + return 1; + }), true, CombinationTestMode.COMPLETE_EXCEPTIONAL); + } + + @Test + public void testThenCombineAsyncThrowable() { + testCombination0((stage, other) -> stage.thenCombineAsync(other, (v1, v2) -> { + fail(); + return 1; + }), true, CombinationTestMode.COMPLETE_EXCEPTIONAL); + } + + @Test + public void testThenCombineAsyncWithExecutorThrowable() { + EventExecutor asyncExecutor = asyncExecutor(); + + testCombination0((stage, other) -> stage.thenCombineAsync(other, (v1, v2) -> { + fail(); + return 1; + }, asyncExecutor), true, CombinationTestMode.COMPLETE_EXCEPTIONAL); + } + + @Test + public void testThenCombineThrows() { + testCombination0((stage, other) -> stage.thenCombine(other, (v1, v2) -> { + throw EXPECTED_EXCEPTION; + }), true, CombinationTestMode.THROW); + } + + @Test + public void testThenCombineAsyncThrows() { + testCombination0((stage, other) -> stage.thenCombineAsync(other, (v1, v2) -> { + throw EXPECTED_EXCEPTION; + }), true, CombinationTestMode.THROW); + } + + @Test + public void testThenCombineAsyncWithExecutorThrows() { + EventExecutor asyncExecutor = asyncExecutor(); + + testCombination0((stage, other) -> stage.thenCombineAsync(other, (v1, v2) -> { + throw EXPECTED_EXCEPTION; + }, asyncExecutor), true, CombinationTestMode.THROW); + } + + @Test + public void testThenAcceptBoth() { + testBoth0((stage, other) -> stage.thenAcceptBoth(other, (v1, v2) -> { + assertSame(v1, INITIAL_BOOLEAN); + assertSame(v2, INITIAL_BOOLEAN); + assertTrue(stage.executor().inEventLoop()); + }), CombinationTestMode.COMPLETE); + } + + @Test + public void testThenAcceptBothAsync() { + testBoth0((stage, other) -> stage.thenAcceptBothAsync(other, (v1, v2) -> { + assertSame(v1, INITIAL_BOOLEAN); + assertSame(v2, INITIAL_BOOLEAN); + assertFalse(stage.executor().inEventLoop()); + }), CombinationTestMode.COMPLETE); + } + + @Test + public void testThenAcceptBothAsyncWithExecutor() { + EventExecutor asyncExecutor = asyncExecutor(); + + testBoth0((stage, other) -> stage.thenAcceptBothAsync(other, (v1, v2) -> { + assertSame(v1, INITIAL_BOOLEAN); + assertSame(v2, INITIAL_BOOLEAN); + assertFalse(stage.executor().inEventLoop()); + assertTrue(asyncExecutor.inEventLoop()); + }, asyncExecutor), CombinationTestMode.COMPLETE); + } + + @Test + public void testThenAcceptBothThrowable() { + testBoth0((stage, other) -> stage.thenAcceptBoth(other, (v1, v2) -> { + fail(); + }), CombinationTestMode.COMPLETE_EXCEPTIONAL); + } + + @Test + public void testThenAcceptBothAsyncThrowable() { + testBoth0((stage, other) -> stage.thenAcceptBothAsync(other, (v1, v2) -> { + fail(); + }), CombinationTestMode.COMPLETE_EXCEPTIONAL); + } + + @Test + public void testThenAcceptBothAsyncWithExecutorThrowable() { + EventExecutor asyncExecutor = asyncExecutor(); + + testBoth0((stage, other) -> stage.thenAcceptBothAsync(other, (v1, v2) -> { + fail(); + }, asyncExecutor), CombinationTestMode.COMPLETE_EXCEPTIONAL); + } + + @Test + public void testThenAcceptBothThrows() { + testBoth0((stage, other) -> stage.thenAcceptBoth(other, (v1, v2) -> { + throw EXPECTED_EXCEPTION; + }), CombinationTestMode.THROW); + } + + @Test + public void testThenAcceptBothAsyncThrows() { + testBoth0((stage, other) -> stage.thenCombineAsync(other, (v1, v2) -> { + throw EXPECTED_EXCEPTION; + }), CombinationTestMode.THROW); + } + + @Test + public void testThenAcceptBothAsyncWithExecutorThrows() { + EventExecutor asyncExecutor = asyncExecutor(); + + testBoth0((stage, other) -> stage.thenCombineAsync(other, (v1, v2) -> { + throw EXPECTED_EXCEPTION; + }, asyncExecutor), CombinationTestMode.THROW); + } + + @Test + public void testRunAfterBoth() { + testBoth0((stage, other) -> stage.runAfterBoth(other, () -> { + assertTrue(stage.executor().inEventLoop()); + }), CombinationTestMode.COMPLETE); + } + + @Test + public void testRunAfterBothAsync() { + testBoth0((stage, other) -> stage.runAfterBothAsync(other, () -> { + assertFalse(stage.executor().inEventLoop()); + }), CombinationTestMode.COMPLETE); + } + + @Test + public void testRunAfterBothAsyncWithExecutor() { + EventExecutor asyncExecutor = asyncExecutor(); + + testBoth0((stage, other) -> stage.runAfterBothAsync(other, () -> { + assertFalse(stage.executor().inEventLoop()); + assertTrue(asyncExecutor.inEventLoop()); + }, asyncExecutor), CombinationTestMode.COMPLETE); + } + + @Test + public void testRunAfterBothThrowable() { + testBoth0((stage, other) -> stage.runAfterBothAsync(other, () -> { + fail(); + }), CombinationTestMode.COMPLETE_EXCEPTIONAL); + } + + @Test + public void testRunAfterBothAsyncThrowable() { + testBoth0((stage, other) -> stage.runAfterBothAsync(other, () -> { + fail(); + }), CombinationTestMode.COMPLETE_EXCEPTIONAL); + } + + @Test + public void testRunAfterBothAsyncWithExecutorThrowable() { + EventExecutor asyncExecutor = asyncExecutor(); + + testBoth0((stage, other) -> stage.runAfterBothAsync(other, () -> { + fail(); + }, asyncExecutor), CombinationTestMode.COMPLETE_EXCEPTIONAL); + } + + @Test + public void testRunAfterBothThrows() { + testBoth0((stage, other) -> stage.runAfterBothAsync(other, () -> { + throw EXPECTED_EXCEPTION; + }), CombinationTestMode.THROW); + } + + @Test + public void testRunAfterBothAsyncThrows() { + testBoth0((stage, other) -> stage.runAfterBothAsync(other, () -> { + throw EXPECTED_EXCEPTION; + }), CombinationTestMode.THROW); + } + + @Test + public void testRunAfterBothAsyncWithExecutorThrows() { + EventExecutor asyncExecutor = asyncExecutor(); + + testBoth0((stage, other) -> stage.runAfterBothAsync(other, () -> { + throw EXPECTED_EXCEPTION; + }, asyncExecutor), CombinationTestMode.THROW); + } + + @Test + public void testApplyToEither() { + testCombination0((stage, other) -> stage.applyToEither(other, v1 -> { + assertSame(v1, INITIAL_BOOLEAN); + assertTrue(stage.executor().inEventLoop()); + + return 1; + }), false, CombinationTestMode.COMPLETE); + } + + @Test + public void testApplyToEitherAsync() { + testCombination0((stage, other) -> stage.applyToEitherAsync(other, v1 -> { + assertSame(v1, INITIAL_BOOLEAN); + assertFalse(stage.executor().inEventLoop()); + + return 1; + }), false, CombinationTestMode.COMPLETE); + } + + @Test + public void testApplyToEitherAsyncWithExecutor() { + EventExecutor asyncExecutor = asyncExecutor(); + + testCombination0((stage, other) -> stage.applyToEitherAsync(other, v1 -> { + assertSame(v1, INITIAL_BOOLEAN); + assertFalse(stage.executor().inEventLoop()); + assertTrue(asyncExecutor.inEventLoop()); + return 1; + }, asyncExecutor), false, CombinationTestMode.COMPLETE); + } + + @Test + public void testApplyToEitherThrows() { + testCombination0((stage, other) -> stage.applyToEither(other, v1 -> { + throw EXPECTED_EXCEPTION; + }), false, CombinationTestMode.THROW); + } + + @Test + public void testApplyToEitherAsyncThrows() { + testCombination0((stage, other) -> stage.applyToEitherAsync(other, v1 -> { + throw EXPECTED_EXCEPTION; + }), false, CombinationTestMode.THROW); + } + + @Test + public void testApplyToEitherAsyncWithExecutorThrows() { + EventExecutor asyncExecutor = asyncExecutor(); + + testCombination0((stage, other) -> stage.applyToEitherAsync(other, v1 -> { + throw EXPECTED_EXCEPTION; + }, asyncExecutor), true, CombinationTestMode.THROW); + } + + @Test + public void testAcceptEither() { + testEither0((stage, other) -> stage.acceptEither(other, v1 -> { + assertSame(v1, INITIAL_BOOLEAN); + assertTrue(stage.executor().inEventLoop()); + }), false, CombinationTestMode.COMPLETE); + } + + @Test + public void testAcceptEitherAsync() { + testEither0((stage, other) -> stage.acceptEitherAsync(other, v1 -> { + assertSame(v1, INITIAL_BOOLEAN); + assertFalse(stage.executor().inEventLoop()); + }), false, CombinationTestMode.COMPLETE); + } + + @Test + public void testAcceptEitherAsyncWithExecutor() { + EventExecutor asyncExecutor = asyncExecutor(); + + testEither0((stage, other) -> stage.acceptEitherAsync(other, v1 -> { + assertSame(v1, INITIAL_BOOLEAN); + assertFalse(stage.executor().inEventLoop()); + assertTrue(asyncExecutor.inEventLoop()); + }, asyncExecutor), false, CombinationTestMode.COMPLETE); + } + + @Test + public void testAcceptEitherThrows() { + testEither0((stage, other) -> stage.acceptEither(other, v1 -> { + throw EXPECTED_EXCEPTION; + }), false, CombinationTestMode.THROW); + } + + @Test + public void testAcceptEitherAsyncThrows() { + testEither0((stage, other) -> stage.acceptEitherAsync(other, v1 -> { + throw EXPECTED_EXCEPTION; + }), false, CombinationTestMode.THROW); + } + + @Test + public void testAcceptEitherAsyncWithExecutorThrows() { + EventExecutor asyncExecutor = asyncExecutor(); + + testEither0((stage, other) -> stage.acceptEitherAsync(other, v1 -> { + throw EXPECTED_EXCEPTION; + }, asyncExecutor), true, CombinationTestMode.THROW); + } + + @Test + public void testRunAfterEither() { + testEither0((stage, other) -> stage.runAfterEither(other, () -> { + assertTrue(stage.executor().inEventLoop()); + }), false, CombinationTestMode.COMPLETE); + } + + @Test + public void testRunAfterEitherAsync() { + testEither0((stage, other) -> stage.runAfterEitherAsync(other, () -> { + assertFalse(stage.executor().inEventLoop()); + }), false, CombinationTestMode.COMPLETE); + } + + @Test + public void testRunAfterEitherAsyncWithExecutor() { + EventExecutor asyncExecutor = asyncExecutor(); + + testEither0((stage, other) -> stage.runAfterEitherAsync(other, () -> { + assertFalse(stage.executor().inEventLoop()); + assertTrue(asyncExecutor.inEventLoop()); + }, asyncExecutor), false, CombinationTestMode.COMPLETE); + } + + @Test + public void testRunAfterEitherThrows() { + testEither0((stage, other) -> stage.runAfterEither(other, () -> { + throw EXPECTED_EXCEPTION; + }), false, CombinationTestMode.THROW); + } + + @Test + public void testRunAfterEitherAsyncThrows() { + testEither0((stage, other) -> stage.runAfterEitherAsync(other, () -> { + throw EXPECTED_EXCEPTION; + }), false, CombinationTestMode.THROW); + } + + @Test + public void testRunAfterEitherAsyncWithExecutorThrows() { + EventExecutor asyncExecutor = asyncExecutor(); + + testEither0((stage, other) -> stage.runAfterEitherAsync(other, () -> { + throw EXPECTED_EXCEPTION; + }, asyncExecutor), true, CombinationTestMode.THROW); + } + + private enum CombinationTestMode { + COMPLETE, + COMPLETE_EXCEPTIONAL, + THROW + } + + private void testEither0(BiFunction, + CompletionStage, FutureCompletionStage> fn, + boolean notifyAll, CombinationTestMode testMode) { + testCombination0((futureStage, stage) -> fn.apply(futureStage, stage).thenApply(v -> { + assertNull(v); + return EXPECTED_INTEGER; + }), notifyAll, testMode); + } + + private void testBoth0(BiFunction, + CompletionStage, FutureCompletionStage> fn, CombinationTestMode testMode) { + testCombination0((futureStage, stage) -> fn.apply(futureStage, stage).thenApply(v -> { + assertNull(v); + return EXPECTED_INTEGER; + }), true, testMode); + } + + private void testCombination0(BiFunction, + CompletionStage, FutureCompletionStage> fn, + boolean notifyAll, CombinationTestMode testMode) { + EventExecutor executor = executor(); + + // We run this in a loop as we we need to ensure our implementation is thread-safe as the both stages + // may use different threads. + for (int i = 0; i < 1000; i++) { + Promise promise = executor.newPromise(); + FutureCompletionStage stage = new DefaultFutureCompletionStage<>(promise); + CompletableFuture completableFuture = new CompletableFuture<>(); + + Future f = fn.apply(stage, completableFuture).future(); + + List runnables = new ArrayList<>(2); + switch (testMode) { + case THROW: + Collections.addAll(runnables, () -> completableFuture.completeExceptionally(EXPECTED_EXCEPTION), + () -> promise.setFailure(EXPECTED_EXCEPTION)); + break; + case COMPLETE_EXCEPTIONAL: + // Let's randomly notify either one or both of the promise / future with an exception. + int random = ThreadLocalRandom.current().nextInt(0, 3); + if (random == 0) { + Collections.addAll(runnables, () -> completableFuture.complete(INITIAL_BOOLEAN), + () -> promise.setFailure(EXPECTED_EXCEPTION)); + } else if (random == 1) { + Collections.addAll(runnables, () -> completableFuture.completeExceptionally(EXPECTED_EXCEPTION), + () -> promise.setSuccess(INITIAL_BOOLEAN)); + } else { + Collections.addAll(runnables, () -> completableFuture.completeExceptionally(EXPECTED_EXCEPTION), + () -> promise.setFailure(EXPECTED_EXCEPTION)); + } + break; + case COMPLETE: + Collections.addAll(runnables, () -> completableFuture.complete(INITIAL_BOOLEAN), + () -> promise.setSuccess(INITIAL_BOOLEAN)); + break; + default: + fail(); + } + + Collections.shuffle(runnables); + for (Runnable task : runnables) { + ForkJoinPool.commonPool().execute(task); + if (!notifyAll) { + break; + } + } + + f.awaitUninterruptibly(); + + switch (testMode) { + case COMPLETE_EXCEPTIONAL: + case THROW: + assertSame(EXPECTED_EXCEPTION, f.cause()); + break; + case COMPLETE: + assertEquals(EXPECTED_INTEGER, f.syncUninterruptibly().getNow()); + break; + default: + fail(); + } + } + } + + @Test + public void testThenCompose() { + testHandle0(newSucceededFuture(), stage -> stage.thenCompose(v -> { + assertSame(INITIAL_BOOLEAN, v); + + assertTrue(stage.executor().inEventLoop()); + + return CompletableFuture.completedFuture(EXPECTED_BOOLEAN); + }), false); + } + + @Test + public void testThenComposeAsync() { + testHandle0(newSucceededFuture(), stage -> stage.thenComposeAsync(v -> { + assertSame(INITIAL_BOOLEAN, v); + + assertFalse(stage.executor().inEventLoop()); + + return CompletableFuture.completedFuture(EXPECTED_BOOLEAN); + }), false); + } + + @Test + public void testThenComposeAsyncWithExecutor() { + EventExecutor asyncExecutor = asyncExecutor(); + + testHandle0(newSucceededFuture(), stage -> stage.thenComposeAsync(v -> { + assertSame(INITIAL_BOOLEAN, v); + + assertFalse(stage.executor().inEventLoop()); + assertTrue(asyncExecutor.inEventLoop()); + + return CompletableFuture.completedFuture(EXPECTED_BOOLEAN); + }, asyncExecutor), false); + } + + @Test + public void testThenComposeThrows() { + testHandle0(newSucceededFuture(), stage -> stage.thenCompose(v -> { + throw EXPECTED_EXCEPTION; + }), true); + } + + @Test + public void testThenComposeAsyncThrows() { + testHandle0(newSucceededFuture(), stage -> stage.thenComposeAsync(v -> { + throw EXPECTED_EXCEPTION; + }), true); + } + + @Test + public void testThenComposeWithExecutorThrows() { + EventExecutor asyncExecutor = asyncExecutor(); + testHandle0(newSucceededFuture(), stage -> stage.thenComposeAsync(v -> { + throw EXPECTED_EXCEPTION; + }, asyncExecutor), true); + } + + @Test + public void testExceptionally() { + testHandle0(newFailedFuture(), stage -> stage.exceptionally(error -> { + assertSame(EXPECTED_EXCEPTION, error); + return EXPECTED_BOOLEAN; + }), false); + } + + @Test + public void testExceptionallyThrows() { + testHandle0(newFailedFuture(), stage -> stage.exceptionally(error -> { + throw EXPECTED_EXCEPTION; + }), true); + } + + @Test + public void testWhenComplete() { + testWhenComplete0(newSucceededFuture(), stage -> stage.whenComplete((v, cause) -> { + assertSame(INITIAL_BOOLEAN, v); + assertNull(cause); + assertTrue(stage.executor().inEventLoop()); + }), false); + } + + @Test + public void testWhenCompleteAsync() { + testWhenComplete0(newSucceededFuture(), stage -> stage.whenCompleteAsync((v, cause) -> { + assertSame(INITIAL_BOOLEAN, v); + assertNull(cause); + + assertFalse(stage.executor().inEventLoop()); + }), false); + } + + @Test + public void testWhenCompleteAsyncWithExecutor() { + EventExecutor asyncExecutor = asyncExecutor(); + + testWhenComplete0(newSucceededFuture(), stage -> stage.whenCompleteAsync((v, cause) -> { + assertSame(INITIAL_BOOLEAN, v); + assertNull(cause); + + assertFalse(stage.executor().inEventLoop()); + assertTrue(asyncExecutor.inEventLoop()); + }, asyncExecutor), false); + } + + @Test + public void testWhenCompleteThrowable() { + testWhenComplete0(newFailedFuture(), stage -> stage.whenComplete((v, cause) -> { + assertSame(EXPECTED_EXCEPTION, cause); + assertNull(v); + + assertTrue(stage.future().executor().inEventLoop()); + }), true); + } + + @Test + public void testWhenCompleteAsyncThrowable() { + testWhenComplete0(newFailedFuture(), stage -> stage.whenCompleteAsync((v, cause) -> { + assertSame(EXPECTED_EXCEPTION, cause); + assertNull(v); + + assertFalse(stage.future().executor().inEventLoop()); + }), true); + } + + @Test + public void testWhenCompleteAsyncWithExecutorThrowable() { + EventExecutor asyncExecutor = asyncExecutor(); + testWhenComplete0(newFailedFuture(), stage -> stage.whenCompleteAsync((v, cause) -> { + assertSame(EXPECTED_EXCEPTION, cause); + assertNull(v); + + assertFalse(stage.future().executor().inEventLoop()); + assertTrue(asyncExecutor.inEventLoop()); + }, asyncExecutor), true); + } + + @Test + public void testWhenCompleteThrows() { + testWhenComplete0(newSucceededFuture(), stage -> stage.whenComplete((v, cause) -> { + throw EXPECTED_EXCEPTION; + }), true); + } + + @Test + public void testWhenCompleteAsyncThrows() { + testWhenComplete0(newSucceededFuture(), stage -> stage.whenCompleteAsync((v, cause) -> { + throw EXPECTED_EXCEPTION; + }), true); + } + + @Test + public void testWhenCompleteAsyncWithExecutorThrows() { + EventExecutor asyncExecutor = asyncExecutor(); + testWhenComplete0(newSucceededFuture(), stage -> stage.whenCompleteAsync((v, cause) -> { + throw EXPECTED_EXCEPTION; + }, asyncExecutor), true); + } + + @Test + public void testHandle() { + testHandle0(newSucceededFuture(), stage -> stage.handle((v, cause) -> { + assertSame(INITIAL_BOOLEAN, v); + assertNull(cause); + + assertTrue(stage.executor().inEventLoop()); + + return EXPECTED_BOOLEAN; + }), false); + } + + @Test + public void testHandleAsync() { + testHandle0(newSucceededFuture(), stage -> stage.handleAsync((v, cause) -> { + assertSame(INITIAL_BOOLEAN, v); + assertNull(cause); + + assertFalse(stage.executor().inEventLoop()); + + return EXPECTED_BOOLEAN; + }), false); + } + + @Test + public void testHandleAsyncWithExecutor() { + EventExecutor asyncExecutor = asyncExecutor(); + + testHandle0(newSucceededFuture(), stage -> stage.handleAsync((v, cause) -> { + assertSame(INITIAL_BOOLEAN, v); + assertNull(cause); + + assertFalse(stage.executor().inEventLoop()); + assertTrue(asyncExecutor.inEventLoop()); + + return EXPECTED_BOOLEAN; + }, asyncExecutor), false); + } + + @Test + public void testHandleThrowable() { + testHandle0(newFailedFuture(), stage -> stage.handle((v, cause) -> { + assertSame(EXPECTED_EXCEPTION, cause); + assertNull(v); + + assertTrue(stage.future().executor().inEventLoop()); + + return EXPECTED_BOOLEAN; + }), false); + } + + @Test + public void testHandleAsyncThrowable() { + testHandle0(newFailedFuture(), stage -> stage.handleAsync((v, cause) -> { + assertSame(EXPECTED_EXCEPTION, cause); + assertNull(v); + + assertFalse(stage.future().executor().inEventLoop()); + + return EXPECTED_BOOLEAN; + }), false); + } + + @Test + public void testHandleAsyncWithExecutorThrowable() { + EventExecutor asyncExecutor = asyncExecutor(); + testHandle0(newFailedFuture(), stage -> stage.handleAsync((v, cause) -> { + assertSame(EXPECTED_EXCEPTION, cause); + assertNull(v); + + assertFalse(stage.future().executor().inEventLoop()); + assertTrue(asyncExecutor.inEventLoop()); + + return EXPECTED_BOOLEAN; + }, asyncExecutor), false); + } + + @Test + public void testHandleFunctionThrows() { + testHandle0(newSucceededFuture(), stage -> stage.handle((v, cause) -> { + throw EXPECTED_EXCEPTION; + }), true); + } + + @Test + public void testHandleAsyncFunctionThrows() { + testHandle0(newSucceededFuture(), stage -> stage.handleAsync((v, cause) -> { + throw EXPECTED_EXCEPTION; + }), true); + } + + @Test + public void testHandleAsyncWithExecutorFunctionThrows() { + EventExecutor asyncExecutor = asyncExecutor(); + testHandle0(newSucceededFuture(), stage -> stage.handleAsync((v, cause) -> { + throw EXPECTED_EXCEPTION; + }, asyncExecutor), true); + } +} diff --git a/common/src/test/java/io/netty/util/concurrent/FutureCompletionStageTest.java b/common/src/test/java/io/netty/util/concurrent/FutureCompletionStageTest.java new file mode 100644 index 0000000000..36f36a6a25 --- /dev/null +++ b/common/src/test/java/io/netty/util/concurrent/FutureCompletionStageTest.java @@ -0,0 +1,68 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.concurrent; + +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; + +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; + +public class FutureCompletionStageTest { + + @Test + public void testCompleteFuture() { + FutureCompletionStage stage = FutureCompletionStage.toFutureCompletionStage( + CompletableFuture.completedFuture(Boolean.TRUE), ImmediateEventExecutor.INSTANCE); + assertSame(ImmediateEventExecutor.INSTANCE, stage.executor()); + assertSame(Boolean.TRUE, stage.future().syncUninterruptibly().getNow()); + } + + @Test + public void testCompleteFutureFailed() { + IllegalStateException exception = new IllegalStateException(); + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(exception); + + FutureCompletionStage stage = FutureCompletionStage.toFutureCompletionStage( + future, ImmediateEventExecutor.INSTANCE); + assertSame(ImmediateEventExecutor.INSTANCE, stage.executor()); + assertSame(exception, stage.future().awaitUninterruptibly().cause()); + } + + @Test + public void testFutureCompletionStageWithSameExecutor() { + FutureCompletionStage stage = ImmediateEventExecutor.INSTANCE + .newSucceededFuture(Boolean.TRUE).asStage(); + assertSame(stage, FutureCompletionStage.toFutureCompletionStage(stage, ImmediateEventExecutor.INSTANCE)); + } + + @Test + public void testFutureCompletionStageWithDifferentExecutor() { + MultithreadEventExecutorGroup group = new MultithreadEventExecutorGroup(1, Executors.defaultThreadFactory()); + try { + FutureCompletionStage stage = group.next().newSucceededFuture(Boolean.TRUE).asStage(); + FutureCompletionStage stage2 = FutureCompletionStage.toFutureCompletionStage( + stage, ImmediateEventExecutor.INSTANCE); + assertNotSame(stage, stage2); + assertSame(stage.future().syncUninterruptibly().getNow(), stage2.future().syncUninterruptibly().getNow()); + } finally { + group.shutdownGracefully(); + } + } +} diff --git a/transport/src/main/java/io/netty/channel/CompleteChannelFuture.java b/transport/src/main/java/io/netty/channel/CompleteChannelFuture.java index 508c4dd6c8..07c7d1e367 100644 --- a/transport/src/main/java/io/netty/channel/CompleteChannelFuture.java +++ b/transport/src/main/java/io/netty/channel/CompleteChannelFuture.java @@ -42,7 +42,7 @@ abstract class CompleteChannelFuture extends CompleteFuture implements Cha } @Override - protected EventExecutor executor() { + public EventExecutor executor() { EventExecutor e = super.executor(); if (e == null) { return channel().eventLoop(); diff --git a/transport/src/main/java/io/netty/channel/DelegatingChannelPromiseNotifier.java b/transport/src/main/java/io/netty/channel/DelegatingChannelPromiseNotifier.java index 5aab4d2b99..8385694a9f 100644 --- a/transport/src/main/java/io/netty/channel/DelegatingChannelPromiseNotifier.java +++ b/transport/src/main/java/io/netty/channel/DelegatingChannelPromiseNotifier.java @@ -15,6 +15,7 @@ */ package io.netty.channel; +import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.internal.PromiseNotificationUtil; @@ -44,6 +45,11 @@ public final class DelegatingChannelPromiseNotifier implements ChannelPromise, C this.logNotifyFailure = logNotifyFailure; } + @Override + public EventExecutor executor() { + return delegate.executor(); + } + @Override public void operationComplete(ChannelFuture future) throws Exception { InternalLogger internalLogger = logNotifyFailure ? logger : null; diff --git a/transport/src/main/java/io/netty/channel/VoidChannelPromise.java b/transport/src/main/java/io/netty/channel/VoidChannelPromise.java index 0d77458025..a72ee5db18 100644 --- a/transport/src/main/java/io/netty/channel/VoidChannelPromise.java +++ b/transport/src/main/java/io/netty/channel/VoidChannelPromise.java @@ -17,7 +17,7 @@ package io.netty.channel; import static java.util.Objects.requireNonNull; -import io.netty.util.concurrent.AbstractFuture; +import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.internal.UnstableApi; @@ -25,7 +25,7 @@ import io.netty.util.internal.UnstableApi; import java.util.concurrent.TimeUnit; @UnstableApi -public final class VoidChannelPromise extends AbstractFuture implements ChannelPromise { +public final class VoidChannelPromise implements ChannelPromise { private final Channel channel; // Will be null if we should not propagate exceptions through the pipeline on failure case. @@ -51,6 +51,11 @@ public final class VoidChannelPromise extends AbstractFuture implements Ch } } + @Override + public EventExecutor executor() { + return channel.eventLoop(); + } + @Override public VoidChannelPromise addListener(GenericFutureListener> listener) { fail(); diff --git a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java index e8f655682b..1e0d5a5e9f 100644 --- a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java +++ b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java @@ -45,7 +45,7 @@ public class DefaultChannelGroup extends AbstractSet implements Channel private static final AtomicInteger nextId = new AtomicInteger(); private final String name; - private final EventExecutor executor; + final EventExecutor executor; private final ConcurrentMap serverChannels = new ConcurrentHashMap<>(); private final ConcurrentMap nonServerChannels = new ConcurrentHashMap<>(); private final ChannelFutureListener remover = future -> remove(future.channel()); diff --git a/transport/src/main/java/io/netty/channel/group/VoidChannelGroupFuture.java b/transport/src/main/java/io/netty/channel/group/VoidChannelGroupFuture.java index 740ac33702..a282710d9b 100644 --- a/transport/src/main/java/io/netty/channel/group/VoidChannelGroupFuture.java +++ b/transport/src/main/java/io/netty/channel/group/VoidChannelGroupFuture.java @@ -17,6 +17,7 @@ package io.netty.channel.group; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; @@ -27,12 +28,17 @@ import java.util.concurrent.TimeUnit; final class VoidChannelGroupFuture implements ChannelGroupFuture { private static final Iterator EMPTY = Collections.emptyList().iterator(); - private final ChannelGroup group; + private final DefaultChannelGroup group; - VoidChannelGroupFuture(ChannelGroup group) { + VoidChannelGroupFuture(DefaultChannelGroup group) { this.group = group; } + @Override + public EventExecutor executor() { + return group.executor; + } + @Override public ChannelGroup group() { return group;