From 59275fba525ceaf7a493c8115e20d69ee97272b1 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Wed, 8 Sep 2021 09:06:28 +0200 Subject: [PATCH] Netty Future no longer extends JDK Future (#11647) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Motivation: It is important to avoid blocking method calls in an event loop thread, since that can stall the system. Netty's Future interface was extending the JDK Future interface, which included a number of blocking methods of questionable use in Netty. We wish to reduce the number of blocking methods on the Future API in order to discourage their use a little. Further more, the Netty Future specification of the behaviour of the cancel() and isDone() methods are inconsistent with those of the JDK Future. If Netty's Future stop extending the JDK Future interface, it will also no longer be bound by its specification. Modification: Make Netty's Future no longer extend the JDK Future interface. Change the EvenExecutorGroup interface to no longer extend ScheduledExecutorService. The EventExecutorGroup still extends Executor, because Executor does not dictate any return type of the `execute()` method — this is also useful in the DefaultFutureCompletionStage implementation. The Netty ScheduledFuture interface has been removed since it provided no additional features that were actually used. Numerous changes to use sites that previously relied on the JDK types. Remove the `Future.cancel()` method that took a boolean argument — this argument was always ignored in our implementations, which was another spec deviation. Various `invoke*` and `shutdown*` methods have been removed from the EvenExecutorGroup API since it no longer extends ScheduledExecutorService — these were either not used anywhere, or deprecated with better alternatives available. Updates to cancellation javadocs. Result: Cleaner code, leaner API. --- .../websocketx/WebSocketClientHandshaker.java | 2 +- ...bSocketClientProtocolHandshakeHandler.java | 2 +- .../websocketx/WebSocketProtocolHandler.java | 2 +- ...bSocketServerProtocolHandshakeHandler.java | 2 +- .../codec/http2/Http2ConnectionHandler.java | 3 +- .../concurrent/AbstractEventExecutor.java | 58 ++++- .../AbstractScheduledEventExecutor.java | 82 +++---- .../util/concurrent/AsynchronousResult.java | 11 +- .../DefaultFutureCompletionStage.java | 30 ++- .../netty/util/concurrent/DefaultPromise.java | 17 +- .../util/concurrent/EventExecutorGroup.java | 232 ++++++++++++------ .../java/io/netty/util/concurrent/Future.java | 44 +++- .../io/netty/util/concurrent/Futures.java | 2 +- .../util/concurrent/GlobalEventExecutor.java | 12 +- .../concurrent/ImmediateEventExecutor.java | 34 ++- .../MultithreadEventExecutorGroup.java | 14 +- .../NonStickyEventExecutorGroup.java | 100 +++----- .../io/netty/util/concurrent/PromiseTask.java | 5 + .../netty/util/concurrent/RunnableFuture.java | 3 +- .../concurrent/RunnableFutureAdapter.java | 11 +- .../concurrent/RunnableScheduledFuture.java | 15 +- .../RunnableScheduledFutureAdapter.java | 42 ++-- .../util/concurrent/ScheduledFuture.java | 23 -- .../concurrent/SingleThreadEventExecutor.java | 92 +------ .../UnorderedThreadPoolEventExecutor.java | 127 ++++++---- .../AbstractScheduledEventExecutorTest.java | 31 ++- .../util/concurrent/DefaultPromiseTest.java | 56 +++-- .../io/netty/util/concurrent/FuturesTest.java | 14 +- .../concurrent/GlobalEventExecutorTest.java | 12 +- .../SingleThreadEventExecutorTest.java | 83 +------ .../UnorderedThreadPoolEventExecutorTest.java | 39 ++- .../io/netty/handler/proxy/ProxyHandler.java | 2 +- .../flush/FlushConsolidationHandler.java | 2 +- .../java/io/netty/handler/ssl/SslHandler.java | 6 +- .../handler/timeout/IdleStateHandler.java | 6 +- .../handler/timeout/WriteTimeoutHandler.java | 4 +- .../traffic/GlobalChannelTrafficCounter.java | 10 +- .../GlobalChannelTrafficShapingHandler.java | 24 +- .../traffic/GlobalTrafficShapingHandler.java | 21 +- .../netty/handler/traffic/TrafficCounter.java | 17 +- .../traffic/TrafficShapingHandlerTest.java | 9 +- .../epoll/EpollSocketChannelBenchmark.java | 2 +- .../BurstCostExecutorsBenchmark.java | 139 ++++++----- ...nnableScheduledFutureAdapterBenchmark.java | 4 +- .../AbstractSharedExecutorMicrobenchmark.java | 36 ++- .../java/io/netty/resolver/dns/Cache.java | 11 +- .../netty/resolver/dns/DnsQueryContext.java | 2 +- .../netty/resolver/dns/DnsResolveContext.java | 2 +- .../AbstractSingleThreadEventLoopTest.java | 3 +- .../socket/SocketCancelWriteTest.java | 4 +- .../socket/SocketConnectionAttemptTest.java | 2 +- .../channel/epoll/AbstractEpollChannel.java | 6 +- .../channel/epoll/EpollEventLoopTest.java | 2 +- .../channel/kqueue/AbstractKQueueChannel.java | 4 +- .../channel/kqueue/KQueueEventLoopTest.java | 2 +- .../main/java/io/netty/channel/Channel.java | 1 + .../netty/channel/SingleThreadEventLoop.java | 2 +- .../channel/embedded/EmbeddedEventLoop.java | 16 +- .../netty/channel/nio/AbstractNioChannel.java | 6 +- .../io/netty/channel/AbstractChannelTest.java | 7 +- .../channel/SingleThreadEventLoopTest.java | 44 ++-- .../netty/channel/nio/NioEventLoopTest.java | 7 +- .../socket/nio/AbstractNioChannelTest.java | 35 ++- 63 files changed, 809 insertions(+), 829 deletions(-) delete mode 100644 common/src/main/java/io/netty/util/concurrent/ScheduledFuture.java diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker.java index 18c10647dd..9aabd260e3 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker.java @@ -504,7 +504,7 @@ public abstract class WebSocketClientHandshaker { }, forceCloseTimeoutMillis, TimeUnit.MILLISECONDS); channel.closeFuture().addListener(ignore -> { - forceCloseFuture.cancel(false); + forceCloseFuture.cancel(); }); } }); diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandshakeHandler.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandshakeHandler.java index 055cde8b46..7d6cf101be 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandshakeHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandshakeHandler.java @@ -117,7 +117,7 @@ class WebSocketClientProtocolHandshakeHandler implements ChannelHandler { }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS); // Cancel the handshake timeout when handshake is finished. - localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel(false)); + localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel()); } /** diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java index dab9bdc726..3436a04d50 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java @@ -125,7 +125,7 @@ abstract class WebSocketProtocolHandler extends MessageToMessageDecoder timeoutTask.cancel(false)); + closeSent.asFuture().addListener(future -> timeoutTask.cancel()); } /** diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandshakeHandler.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandshakeHandler.java index 5c35b39784..1968267c3d 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandshakeHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandshakeHandler.java @@ -157,6 +157,6 @@ class WebSocketServerProtocolHandshakeHandler implements ChannelHandler { }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS); // Cancel the handshake timeout when handshake is finished. - localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel(false)); + localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel()); } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java index cde8ff15b0..12bb1339e8 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java @@ -27,7 +27,6 @@ import io.netty.util.CharsetUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.ScheduledFuture; import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -900,7 +899,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http @Override public void operationComplete(Future sentGoAwayFuture) { if (timeoutTask != null) { - timeoutTask.cancel(false); + timeoutTask.cancel(); } doClose(); } diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java index 1e8fc6fbee..1158c248d9 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java @@ -18,14 +18,15 @@ package io.netty.util.concurrent; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; -import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.Callable; import java.util.concurrent.Executors; +import static java.util.Objects.requireNonNull; + /** * Abstract base class for {@link EventExecutor} implementations. */ -public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor { +public abstract class AbstractEventExecutor implements EventExecutor { private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEventExecutor.class); static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2; static final long DEFAULT_SHUTDOWN_TIMEOUT = 15; @@ -43,26 +44,60 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl } @Override - public final Future submit(Runnable task) { - return (Future) super.submit(task); + public final Future submit(Runnable task) { + var futureTask = newTaskFor(task, (Void) null); + execute(futureTask); + return futureTask; } @Override public final Future submit(Runnable task, T result) { - return (Future) super.submit(task, result); + var futureTask = newTaskFor(task, result); + execute(futureTask); + return futureTask; } @Override public final Future submit(Callable task) { - return (Future) super.submit(task); + var futureTask = newTaskFor(task); + execute(futureTask); + return futureTask; } - @Override + /** + * Decorate the given {@link Runnable} and its return value, as a {@link RunnableFuture}, such that the + * returned {@link RunnableFuture} completes with the given result at the end of executing its + * {@link RunnableFuture#run()} method. + *

+ * The returned {@link RunnableFuture} is the task that will actually be run by a thread in this + * executor. + *

+ * This method can be overridden by sub-classes to hook into the life cycle of the given task. + * + * @param runnable The task to be decorated. + * @param value The value that the returned future will complete with, assuming the given {@link Runnable} doesn't + * throw an exception. + * @param The type of the result value. + * @return The decorated {@link Runnable} that is now also a {@link Future}. + */ protected RunnableFuture newTaskFor(Runnable runnable, T value) { return newRunnableFuture(newPromise(), runnable, value); } - @Override + /** + * Decorate the given {@link Callable} and its return value, as a {@link RunnableFuture}, such that the + * returned {@link RunnableFuture} completes with the returned result from the {@link Callable} at the end of + * executing its {@link RunnableFuture#run()} method. + *

+ * The returned {@link RunnableFuture} is the task that will actually be run by a thread in this + * executor. + *

+ * This method can be overridden by sub-classes to hook into the life cycle of the given task. + * + * @param callable The task to be decorated. + * @param The type of the result value. + * @return The decorated {@link Runnable} that is now also a {@link Future}. + */ protected RunnableFuture newTaskFor(Callable callable) { return newRunnableFuture(newPromise(), callable); } @@ -85,17 +120,14 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl * {@link RunnableFuture}. */ private static RunnableFuture newRunnableFuture(Promise promise, Callable task) { - return new RunnableFutureAdapter<>(promise, task); + return new RunnableFutureAdapter<>(promise, requireNonNull(task, "task")); } /** * Returns a new {@link RunnableFuture} build on top of the given {@link Promise} and {@link Runnable} and * {@code value}. - * - * This can be used if you want to override {@link #newTaskFor(Runnable, V)} and return a different - * {@link RunnableFuture}. */ private static RunnableFuture newRunnableFuture(Promise promise, Runnable task, V value) { - return new RunnableFutureAdapter<>(promise, Executors.callable(task, value)); + return new RunnableFutureAdapter<>(promise, Executors.callable(requireNonNull(task, "task"), value)); } } diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java index 478d479cb1..e61fe326b8 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java @@ -15,8 +15,6 @@ */ package io.netty.util.concurrent; -import static java.util.Objects.requireNonNull; - import io.netty.util.internal.DefaultPriorityQueue; import io.netty.util.internal.PriorityQueue; import io.netty.util.internal.PriorityQueueNode; @@ -24,12 +22,13 @@ import io.netty.util.internal.PriorityQueueNode; import java.util.Comparator; import java.util.Queue; import java.util.concurrent.Callable; -import java.util.concurrent.Delayed; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.Executors.callable; + /** * Abstract base class for {@link EventExecutor}s that want to support scheduling. */ @@ -60,7 +59,7 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut static long deadlineNanos(long delay) { long deadlineNanos = nanoTime() + delay; // Guard against overflow - return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos; + return deadlineNanos < 0? Long.MAX_VALUE : deadlineNanos; } PriorityQueue> scheduledTaskQueue() { @@ -79,7 +78,7 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut /** * Cancel all scheduled tasks. - * + *

* This method MUST be called only when {@link #inEventLoop()} is {@code true}. */ protected final void cancelScheduledTasks() { @@ -92,8 +91,8 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut final RunnableScheduledFutureNode[] scheduledTasks = scheduledTaskQueue.toArray(EMPTY_RUNNABLE_SCHEDULED_FUTURE_NODES); - for (RunnableScheduledFutureNode task: scheduledTasks) { - task.cancel(false); + for (RunnableScheduledFutureNode task : scheduledTasks) { + task.cancel(); } scheduledTaskQueue.clearIgnoringIndexes(); @@ -107,16 +106,16 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut } /** - * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}. - * You should use {@link #nanoTime()} to retrieve the correct {@code nanoTime}. - * + * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}. You should use {@link + * #nanoTime()} to retrieve the correct {@code nanoTime}. + *

* This method MUST be called only when {@link #inEventLoop()} is {@code true}. */ protected final RunnableScheduledFuture pollScheduledTask(long nanoTime) { assert inEventLoop(); Queue> scheduledTaskQueue = this.scheduledTaskQueue; - RunnableScheduledFutureNode scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); + RunnableScheduledFutureNode scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek(); if (scheduledTask == null) { return null; } @@ -130,12 +129,12 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut /** * Return the nanoseconds when the next scheduled task is ready to be run or {@code -1} if no task is scheduled. - * + *

* This method MUST be called only when {@link #inEventLoop()} is {@code true}. */ protected final long nextScheduledTaskNano() { Queue> scheduledTaskQueue = this.scheduledTaskQueue; - RunnableScheduledFutureNode scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); + RunnableScheduledFutureNode scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek(); if (scheduledTask == null) { return -1; } @@ -152,30 +151,30 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut /** * Returns {@code true} if a scheduled task is ready for processing. - * + *

* This method MUST be called only when {@link #inEventLoop()} is {@code true}. */ protected final boolean hasScheduledTasks() { assert inEventLoop(); Queue> scheduledTaskQueue = this.scheduledTaskQueue; - RunnableScheduledFutureNode scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); + RunnableScheduledFutureNode scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek(); return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime(); } @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + public Future schedule(Runnable command, long delay, TimeUnit unit) { requireNonNull(command, "command"); requireNonNull(unit, "unit"); if (delay < 0) { delay = 0; } - RunnableScheduledFuture task = newScheduledTaskFor(Executors.callable(command), - deadlineNanos(unit.toNanos(delay)), 0); + RunnableScheduledFuture task = newScheduledTaskFor( + callable(command, null), deadlineNanos(unit.toNanos(delay)), 0); return schedule(task); } @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + public Future schedule(Callable callable, long delay, TimeUnit unit) { requireNonNull(callable, "callable"); requireNonNull(unit, "unit"); if (delay < 0) { @@ -186,7 +185,7 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut } @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + public Future scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { requireNonNull(command, "command"); requireNonNull(unit, "unit"); if (initialDelay < 0) { @@ -198,13 +197,13 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut String.format("period: %d (expected: > 0)", period)); } - RunnableScheduledFuture task = newScheduledTaskFor(Executors.callable(command, null), - deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)); + RunnableScheduledFuture task = newScheduledTaskFor( + callable(command, null), deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)); return schedule(task); } @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + public Future scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { requireNonNull(command, "command"); requireNonNull(unit, "unit"); if (initialDelay < 0) { @@ -216,15 +215,15 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut String.format("delay: %d (expected: > 0)", delay)); } - RunnableScheduledFuture task = newScheduledTaskFor(Executors.callable(command, null), - deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)); + RunnableScheduledFuture task = newScheduledTaskFor( + callable(command, null), deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)); return schedule(task); } /** * Add the {@link RunnableScheduledFuture} for execution. */ - protected final ScheduledFuture schedule(final RunnableScheduledFuture task) { + protected final Future schedule(final RunnableScheduledFuture task) { if (inEventLoop()) { add0(task); } else { @@ -253,9 +252,9 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut /** * Returns a new {@link RunnableFuture} build on top of the given {@link Promise} and {@link Callable}. - * - * This can be used if you want to override {@link #newTaskFor(Callable)} and return a different - * {@link RunnableFuture}. + *

+ * This can be used if you want to override {@link #newScheduledTaskFor(Callable, long, long)} and return a + * different {@link RunnableFuture}. */ protected static RunnableScheduledFuture newRunnableScheduledFuture( AbstractScheduledEventExecutor executor, Promise promise, Callable task, @@ -271,7 +270,8 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut return newRunnableScheduledFuture(this, newPromise(), callable, deadlineNanos, period); } - interface RunnableScheduledFutureNode extends PriorityQueueNode, RunnableScheduledFuture { } + interface RunnableScheduledFutureNode extends PriorityQueueNode, RunnableScheduledFuture { + } private static final class DefaultRunnableScheduledFutureNode implements RunnableScheduledFutureNode { private final RunnableScheduledFuture future; @@ -308,8 +308,8 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut } @Override - public RunnableScheduledFuture addListener(C context, - FutureContextListener listener) { + public RunnableScheduledFuture addListener( + C context, FutureContextListener listener) { future.addListener(context, listener); return this; } @@ -335,8 +335,8 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut } @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return future.cancel(mayInterruptIfRunning); + public boolean cancel() { + return future.cancel(); } @Override @@ -360,12 +360,7 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut } @Override - public long getDelay(TimeUnit unit) { - return future.getDelay(unit); - } - - @Override - public int compareTo(Delayed o) { + public int compareTo(RunnableScheduledFuture o) { return future.compareTo(o); } @@ -393,11 +388,6 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut return this; } - @Override - public boolean cancel() { - return cancel(false); - } - @Override public boolean isSuccess() { return future.isSuccess(); diff --git a/common/src/main/java/io/netty/util/concurrent/AsynchronousResult.java b/common/src/main/java/io/netty/util/concurrent/AsynchronousResult.java index 968585aa91..3dd8690f32 100644 --- a/common/src/main/java/io/netty/util/concurrent/AsynchronousResult.java +++ b/common/src/main/java/io/netty/util/concurrent/AsynchronousResult.java @@ -25,9 +25,10 @@ import java.util.concurrent.CancellationException; */ interface AsynchronousResult { /** - * Cancel this asynchronous operation, unless it has already been completed. + * Cancel this asynchronous operation, unless it has already been completed + * or is not {@linkplain #isCancellable() cancellable}. *

- * A cancelled operation is considered to be {@linkplain #isFailed() failed}. + * A cancelled operation is considered to be {@linkplain #isDone() done} and {@linkplain #isFailed() failed}. *

* If the cancellation was successful, the result of this operation will be that it has failed with a * {@link CancellationException}. @@ -66,7 +67,11 @@ interface AsynchronousResult { boolean isDone(); /** - * returns {@code true} if and only if the operation can be cancelled via {@link #cancel()}. + * Returns {@code true} if and only if the operation can be cancelled via {@link #cancel()}. + * Note that this is inherently racy, as the operation could be made + * {@linkplain Promise#setUncancellable() uncancellable} at any time. + * + * @return {@code true} if this operation can be cancelled. */ boolean isCancellable(); diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultFutureCompletionStage.java b/common/src/main/java/io/netty/util/concurrent/DefaultFutureCompletionStage.java index 62b2b013ce..df37edb1e7 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultFutureCompletionStage.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultFutureCompletionStage.java @@ -17,8 +17,11 @@ package io.netty.util.concurrent; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.BiFunction; @@ -33,7 +36,7 @@ import static java.util.Objects.requireNonNull; * * @param the value type. */ -final class DefaultFutureCompletionStage implements FutureCompletionStage { +final class DefaultFutureCompletionStage implements FutureCompletionStage, java.util.concurrent.Future { private enum Marker { EMPTY, ERROR @@ -50,6 +53,31 @@ final class DefaultFutureCompletionStage implements FutureCompletionStage this.future = future; } + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return future.cancel(); + } + + @Override + public boolean isCancelled() { + return future.isCancelled(); + } + + @Override + public boolean isDone() { + return future.isDone(); + } + + @Override + public V get() throws InterruptedException, ExecutionException { + return future.get(); + } + + @Override + public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return future.get(timeout, unit); + } + @Override public Future future() { return future; diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java index 9136b67954..a88a4f5050 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java @@ -384,14 +384,6 @@ public class DefaultPromise implements Promise, Future { @Override public boolean cancel() { - return cancel(false); - } - - /** - * @param mayInterruptIfRunning this value has no effect in this implementation. - */ - @Override - public boolean cancel(boolean mayInterruptIfRunning) { if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) { if (checkNotifyWaiters()) { notifyListeners(); @@ -668,6 +660,15 @@ public class DefaultPromise implements Promise, Future { @Override public FutureCompletionStage asStage() { + return getFutureStageAdaptor(); + } + + @Override + public java.util.concurrent.Future asJdkFuture() { + return getFutureStageAdaptor(); + } + + private DefaultFutureCompletionStage getFutureStageAdaptor() { DefaultFutureCompletionStage stageAdapter = stage; if (stageAdapter == null) { stage = stageAdapter = new DefaultFutureCompletionStage<>(this); diff --git a/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java b/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java index b409bbc026..e2cd2f47ac 100644 --- a/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java +++ b/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java @@ -15,15 +15,10 @@ */ package io.netty.util.concurrent; -import java.util.Collection; -import java.util.Collections; import java.util.Iterator; -import java.util.List; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import static io.netty.util.concurrent.AbstractEventExecutor.DEFAULT_SHUTDOWN_QUIET_PERIOD; import static io.netty.util.concurrent.AbstractEventExecutor.DEFAULT_SHUTDOWN_TIMEOUT; @@ -34,61 +29,88 @@ import static io.netty.util.concurrent.AbstractEventExecutor.DEFAULT_SHUTDOWN_TI * life-cycle and allows shutting them down in a global fashion. * */ -public interface EventExecutorGroup extends ScheduledExecutorService, Iterable { - +public interface EventExecutorGroup extends Iterable, Executor { /** * Returns {@code true} if and only if all {@link EventExecutor}s managed by this {@link EventExecutorGroup} * are being {@linkplain #shutdownGracefully() shut down gracefully} or was {@linkplain #isShutdown() shut down}. + *

+ * An executor group that "is shutting down" can still accept new tasks for a little while (the grace period), + * but will eventually start rejecting new tasks. + * At that point, the executor group will be {@linkplain #isShutdown() shut down}. + * + * @return {@code true} if all executors in this group have at least started shutting down, otherwise {@code false}. */ boolean isShuttingDown(); + /** + * Returns {@code true} if all {@link EventExecutor}s managed by this {@link EventExecutorGroup} have been + * {@linkplain #shutdownGracefully() shut down gracefully} and moved past the grace period so that they are no + * longer accepting any new tasks. + *

+ * An executor group that "is shut down" might still be executing tasks that it has queued up, but it will no + * longer be accepting any new tasks. + * Once all running and queued tasks have completed, the executor group will be + * {@linkplain #isTerminated() terminated}. + * + * @return {@code true} if all executors in this group have shut down and are no longer accepting any new tasks. + */ + boolean isShutdown(); + + /** + * Returns {@code true} if all {@link EventExecutor}s managed by this {@link EventExecutorGroup} are + * {@linkplain #isShutdown() shut down}, and all of their tasks have completed. + * + * @return {@code true} if all executors in this group have terminated. + */ + default boolean isTerminated() { + return terminationFuture().isDone(); + } + + /** + * Wait for this {@link EventExecutorGroup} to {@linkplain #isTerminated() terminate}, up to the given timeout. + * + * @param timeout The non-negative maximum amount of time to wait for the executor group to terminate. + * @param unit The non-null time unit of the timeout. + * @return {@code true} if the executor group terminated within the specific timeout. + * @throws InterruptedException If this thread was {@linkplain Thread#interrupt() interrupted} while waiting for + * executor group to terminate. + */ + default boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return terminationFuture().await(timeout, unit); + } + /** * Shortcut method for {@link #shutdownGracefully(long, long, TimeUnit)} with sensible default values. * * @return the {@link #terminationFuture()} */ - default Future shutdownGracefully() { + default Future shutdownGracefully() { return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS); } /** * Signals this executor that the caller wants the executor to be shut down. Once this method is called, * {@link #isShuttingDown()} starts to return {@code true}, and the executor prepares to shut itself down. - * Unlike {@link #shutdown()}, graceful shutdown ensures that no tasks are submitted for 'the quiet period' - * (usually a couple seconds) before it shuts itself down. If a task is submitted during the quiet period, - * it is guaranteed to be accepted and the quiet period will start over. + * This method ensures that no tasks are submitted for 'the quiet period' (usually a couple seconds) before + * it shuts itself down. If a task is submitted during the quiet period, it is guaranteed to be accepted and the + * quiet period will start over. * * @param quietPeriod the quiet period as described in the documentation - * @param timeout the maximum amount of time to wait until the executor is {@linkplain #shutdown()} - * regardless if a task was submitted during the quiet period + * @param timeout the maximum amount of time to wait until the executor is + * {@linkplain #isShuttingDown() shutting down} regardless if a task was submitted during the quiet period. * @param unit the unit of {@code quietPeriod} and {@code timeout} * * @return the {@link #terminationFuture()} */ - Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit); + Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit); /** * Returns the {@link Future} which is notified when all {@link EventExecutor}s managed by this * {@link EventExecutorGroup} have been terminated. + * + * @return The {@link Future} representing the termination of this {@link EventExecutorGroup}. */ - Future terminationFuture(); - - /** - * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead. - */ - @Override - @Deprecated - void shutdown(); - - /** - * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead. - */ - @Override - @Deprecated - default List shutdownNow() { - shutdown(); - return Collections.emptyList(); - } + Future terminationFuture(); /** * Returns one of the {@link EventExecutor}s managed by this {@link EventExecutorGroup}. @@ -98,66 +120,116 @@ public interface EventExecutorGroup extends ScheduledExecutorService, Iterable iterator(); - @Override - default Future submit(Runnable task) { + /** + * Submit the given task for execution in the next available {@link EventExecutor} in this group, + * and return a future that produces a {@code null} result when the task completes. + * + * @param task The task that should be executed in this {@link EventExecutorGroup}. + * @return A future that represents the completion of the submitted task. + */ + default Future submit(Runnable task) { return next().submit(task); } - @Override + /** + * Submit the given task for execution in the next available {@link EventExecutor} in this group, + * and return a future that produces the given result when the task completes. + * + * @param task The task that should be executed in this {@link EventExecutorGroup}. + * @param result The value that the returned future will complete with, if the task completes successfully. + * @param The type of the future result. + * @return A future that represents the completion of the submitted task. + */ default Future submit(Runnable task, T result) { return next().submit(task, result); } - @Override + /** + * Submit the given task for execution in the next available {@link EventExecutor} in this group, + * and return a future that will return the result of the callable when the task completes. + * + * @param task The task that should be executed in this {@link EventExecutorGroup}. + * @param The type of the future result. + * @return A future that represents the completion of the submitted task. + */ default Future submit(Callable task) { return next().submit(task); } - @Override - default ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - return next().schedule(command, delay, unit); + /** + * Schedule the given task for execution after the given delay, in the next available {@link EventExecutor} + * in this group, and return a future that produces a {@code null} result when the task completes. + * + * @param task The task that should be executed in this {@link EventExecutorGroup} after the given delay. + * @param delay A positive time delay, in the given time unit. + * @param unit The non-null time unit for the delay. + * @return A future that represents the completion of the scheduled task. + */ + default Future schedule(Runnable task, long delay, TimeUnit unit) { + return next().schedule(task, delay, unit); + } + + /** + * Schedule the given task for execution after the given delay, in the next available {@link EventExecutor} + * in this group, and return a future that will return the result of the callable when the task completes. + * + * @param task The task that should be executed in this {@link EventExecutorGroup} after the given delay. + * @param delay A positive time delay, in the given time unit. + * @param unit The non-null time unit for the delay. + * @param The type of the future result. + * @return A future that represents the completion of the scheduled task. + */ + default Future schedule(Callable task, long delay, TimeUnit unit) { + return next().schedule(task, delay, unit); + } + + /** + * Schedule the given task for periodic execution in the next available {@link EventExecutor}. + * The first execution will occur after the given initial delay, and the following repeated executions will occur + * with the given period of time between each execution is started. + * If the task takes longer to complete than the requested period, then the following executions will be delayed, + * rather than allowing multiple instances of the task to run concurrently. + *

+ * The task will be executed repeatedly until it either fails with an exception, or its future is + * {@linkplain Future#cancel() cancelled}. The future thus will never complete successfully. + * + * @param task The task that should be scheduled to execute at a fixed rate in this {@link EventExecutorGroup}. + * @param initialDelay The positive initial delay for the first task execution, in terms of the given time unit. + * @param period The positive period for the execution frequency to use after the first execution has started, + * in terms of the given time unit. + * @param unit The non-null time unit for the delay and period. + * @return A future that represents the recurring task, and which can be cancelled to stop future executions. + */ + default Future scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { + return next().scheduleAtFixedRate(task, initialDelay, period, unit); + } + + /** + * Schedule the given task for periodic execution in the next available {@link EventExecutor}. + * The first execution will occur after the given initial delay, and the following repeated executions will occur + * with the given subsequent delay between one task completing and the next task starting. + * The delay from the completion of one task, to the start of the next, stays unchanged regardless of how long a + * task takes to complete. + *

+ * This is in contrast to {@link #scheduleAtFixedRate(Runnable, long, long, TimeUnit)} which varies the delays + * between the tasks in order to hit a given frequency. + *

+ * The task will be executed repeatedly until it either fails with an exception, or its future is + * {@linkplain Future#cancel() cancelled}. The future thus will never complete successfully. + * + * @param task The task that should be scheduled to execute with fixed delays in this {@link EventExecutorGroup}. + * @param initialDelay The positive initial delay for the first task execution, in terms of the given time unit. + * @param delay The positive subsequent delay between task, to use after the first execution has completed, + * in terms of the given time unit. + * @param unit The non-null time unit for the delays. + * @return A future that represents the recurring task, and which can be cancelled to stop future executions. + */ + default Future scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) { + return next().scheduleWithFixedDelay(task, initialDelay, delay, unit); } @Override - default ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - return next().schedule(callable, delay, unit); - } - - @Override - default ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - return next().scheduleAtFixedRate(command, initialDelay, period, unit); - } - - @Override - default ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - return next().scheduleWithFixedDelay(command, initialDelay, delay, unit); - } - - @Override - default List> invokeAll(Collection> tasks) - throws InterruptedException { - return next().invokeAll(tasks); - } - - @Override - default List> invokeAll( - Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { - return next().invokeAll(tasks, timeout, unit); - } - - @Override - default T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { - return next().invokeAny(tasks); - } - - @Override - default T invokeAny(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - return next().invokeAny(tasks, timeout, unit); - } - - @Override - default void execute(Runnable command) { - next().execute(command); + default void execute(Runnable task) { + next().execute(task); } } diff --git a/common/src/main/java/io/netty/util/concurrent/Future.java b/common/src/main/java/io/netty/util/concurrent/Future.java index 7f946fbc7e..f2d80c85da 100644 --- a/common/src/main/java/io/netty/util/concurrent/Future.java +++ b/common/src/main/java/io/netty/util/concurrent/Future.java @@ -150,8 +150,7 @@ import java.util.function.Function; * } * */ -@SuppressWarnings("ClassNameSameAsAncestorName") -public interface Future extends java.util.concurrent.Future, AsynchronousResult { +public interface Future extends AsynchronousResult { /** * Adds the specified listener to this future. The specified listener is notified when this future is {@linkplain * #isDone() done}. If this future is already completed, the specified listener is notified immediately. @@ -236,14 +235,15 @@ public interface Future extends java.util.concurrent.Future, AsynchronousR boolean awaitUninterruptibly(long timeoutMillis); /** - * {@inheritDoc} - *

- * If the cancellation was successful it will fail the future with a {@link CancellationException}. + * Get the result of this future, if it has completed. + * If the future has failed, then an {@link ExecutionException} will be thrown instead. + * If the future has not yet completed, then this method will block until it completes. + * + * @return The result of the task execution, if it completed successfully. + * @throws InterruptedException If the call was blocked, waiting for the future to complete, and the thread was + * {@linkplain Thread#interrupt() interrupted}. + * @throws ExecutionException If the task failed, either by throwing an exception or through cancellation. */ - @Override - boolean cancel(boolean mayInterruptIfRunning); - - @Override default V get() throws InterruptedException, ExecutionException { await(); @@ -257,7 +257,24 @@ public interface Future extends java.util.concurrent.Future, AsynchronousR throw new ExecutionException(cause); } - @Override + /** + * Get the result of this future, if it has completed. + * If the future has failed, then an {@link ExecutionException} will be thrown instead. + * If the future has not yet completed, then this method will block, waiting up to the given timeout for the future + * to complete. + * If the future does not complete within the specified timeout, then a {@link TimeoutException} will be thrown. + * If the timeout is zero, then this method will not block, and instead either get the result or failure of the + * future if completed, or immediately throw a {@link TimeoutException} if not yet completed. + * + * @param timeout The non-negative maximum amount of time, in terms of the given time unit, to wait for the + * completion of the future. + * @param unit The time unit for the timeout. + * @return The value of the successfully completed future. + * @throws InterruptedException If this call was blocking and this thread got + * {@linkplain Thread#interrupt() interrupted}. + * @throws ExecutionException If the task failed, either by throwing an exception, or through cancellation. + * @throws TimeoutException If the future did not complete within the specified timeout. + */ default V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (await(timeout, unit)) { Throwable cause = cause(); @@ -280,6 +297,13 @@ public interface Future extends java.util.concurrent.Future, AsynchronousR return new DefaultFutureCompletionStage<>(this); } + /** + * Returns a {@link java.util.concurrent.Future JDK Future that reflects the state of this {@link Future}. + */ + default java.util.concurrent.Future asJdkFuture() { + return new DefaultFutureCompletionStage<>(this); + } + /** * Creates a new {@link Future} that will complete with the result of this {@link Future} mapped * through the given mapper function. diff --git a/common/src/main/java/io/netty/util/concurrent/Futures.java b/common/src/main/java/io/netty/util/concurrent/Futures.java index f9eeb001c2..09d00c921c 100644 --- a/common/src/main/java/io/netty/util/concurrent/Futures.java +++ b/common/src/main/java/io/netty/util/concurrent/Futures.java @@ -136,7 +136,7 @@ final class Futures { @Override public void operationComplete(Future context, Future future) throws Exception { if (future.isCancelled()) { - context.cancel(false); + context.cancel(); } } } diff --git a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java index bc1bd54e78..14b00ad5ef 100644 --- a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java @@ -67,7 +67,7 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor im private final AtomicBoolean started = new AtomicBoolean(); volatile Thread thread; - private final Future terminationFuture = DefaultPromise.newFailedPromise( + private final Future terminationFuture = DefaultPromise.newFailedPromise( this, new UnsupportedOperationException()).asFuture(); private GlobalEventExecutor() { @@ -150,21 +150,15 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor im } @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { return terminationFuture(); } @Override - public Future terminationFuture() { + public Future terminationFuture() { return terminationFuture; } - @Override - @Deprecated - public void shutdown() { - throw new UnsupportedOperationException(); - } - @Override public boolean isShuttingDown() { return false; diff --git a/common/src/main/java/io/netty/util/concurrent/ImmediateEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/ImmediateEventExecutor.java index d2c945320f..32602f4867 100644 --- a/common/src/main/java/io/netty/util/concurrent/ImmediateEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/ImmediateEventExecutor.java @@ -15,8 +15,6 @@ */ package io.netty.util.concurrent; -import static java.util.Objects.requireNonNull; - import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -25,6 +23,8 @@ import java.util.Queue; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import static java.util.Objects.requireNonNull; + /** * Executes {@link Runnable} objects in the caller's thread. If the {@link #execute(Runnable)} is reentrant it will be * queued until the original {@link Runnable} finishes execution. @@ -54,7 +54,7 @@ public final class ImmediateEventExecutor extends AbstractEventExecutor { } }; - private final Future terminationFuture = DefaultPromise.newFailedPromise( + private final Future terminationFuture = DefaultPromise.newFailedPromise( GlobalEventExecutor.INSTANCE, new UnsupportedOperationException()).asFuture(); private ImmediateEventExecutor() { } @@ -65,19 +65,15 @@ public final class ImmediateEventExecutor extends AbstractEventExecutor { } @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { return terminationFuture(); } @Override - public Future terminationFuture() { + public Future terminationFuture() { return terminationFuture; } - @Override - @Deprecated - public void shutdown() { } - @Override public boolean isShuttingDown() { return false; @@ -99,14 +95,14 @@ public final class ImmediateEventExecutor extends AbstractEventExecutor { } @Override - public void execute(Runnable command) { - requireNonNull(command, "command"); + public void execute(Runnable task) { + requireNonNull(task, "command"); if (!RUNNING.get()) { RUNNING.set(true); try { - command.run(); + task.run(); } catch (Throwable cause) { - logger.info("Throwable caught while executing Runnable {}", command, cause); + logger.info("Throwable caught while executing Runnable {}", task, cause); } finally { Queue delayedRunnables = DELAYED_RUNNABLES.get(); Runnable runnable; @@ -120,7 +116,7 @@ public final class ImmediateEventExecutor extends AbstractEventExecutor { RUNNING.set(false); } } else { - DELAYED_RUNNABLES.get().add(command); + DELAYED_RUNNABLES.get().add(task); } } @@ -130,23 +126,23 @@ public final class ImmediateEventExecutor extends AbstractEventExecutor { } @Override - public ScheduledFuture schedule(Runnable command, long delay, - TimeUnit unit) { + public Future schedule(Runnable task, long delay, + TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + public Future schedule(Callable task, long delay, TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + public Future scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + public Future scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) { throw new UnsupportedOperationException(); } diff --git a/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java b/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java index ae05112c7c..b22b2691b1 100644 --- a/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java +++ b/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java @@ -38,7 +38,7 @@ public class MultithreadEventExecutorGroup implements EventExecutorGroup { private final EventExecutor[] children; private final List readonlyChildren; private final AtomicInteger terminatedChildren = new AtomicInteger(); - private final Promise terminationFuture = GlobalEventExecutor.INSTANCE.newPromise(); + private final Promise terminationFuture = GlobalEventExecutor.INSTANCE.newPromise(); private final boolean powerOfTwo; /** @@ -223,7 +223,7 @@ public class MultithreadEventExecutorGroup implements EventExecutorGroup { } @Override - public final Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public final Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { for (EventExecutor l: children) { l.shutdownGracefully(quietPeriod, timeout, unit); } @@ -231,18 +231,10 @@ public class MultithreadEventExecutorGroup implements EventExecutorGroup { } @Override - public final Future terminationFuture() { + public final Future terminationFuture() { return terminationFuture.asFuture(); } - @Override - @Deprecated - public final void shutdown() { - for (EventExecutor l: children) { - l.shutdown(); - } - } - @Override public final boolean isShuttingDown() { for (EventExecutor l: children) { diff --git a/common/src/main/java/io/netty/util/concurrent/NonStickyEventExecutorGroup.java b/common/src/main/java/io/netty/util/concurrent/NonStickyEventExecutorGroup.java index 529127a1e0..318dd47c86 100644 --- a/common/src/main/java/io/netty/util/concurrent/NonStickyEventExecutorGroup.java +++ b/common/src/main/java/io/netty/util/concurrent/NonStickyEventExecutorGroup.java @@ -15,23 +15,19 @@ */ package io.netty.util.concurrent; -import static io.netty.util.internal.ObjectUtil.checkPositive; -import static java.util.Objects.requireNonNull; - import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.UnstableApi; -import java.util.Collection; import java.util.Iterator; -import java.util.List; import java.util.Queue; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import static io.netty.util.internal.ObjectUtil.checkPositive; +import static java.util.Objects.requireNonNull; + /** * {@link EventExecutorGroup} which will preserve {@link Runnable} execution order but makes no guarantees about what * {@link EventExecutor} (and therefore {@link Thread}) will be used to execute the {@link Runnable}s. @@ -83,32 +79,20 @@ public final class NonStickyEventExecutorGroup implements EventExecutorGroup { } @Override - public Future shutdownGracefully() { + public Future shutdownGracefully() { return group.shutdownGracefully(); } @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { return group.shutdownGracefully(quietPeriod, timeout, unit); } @Override - public Future terminationFuture() { + public Future terminationFuture() { return group.terminationFuture(); } - @SuppressWarnings("deprecation") - @Override - public void shutdown() { - group.shutdown(); - } - - @SuppressWarnings("deprecation") - @Override - public List shutdownNow() { - return group.shutdownNow(); - } - @Override public EventExecutor next() { return newExecutor(group.next()); @@ -136,7 +120,7 @@ public final class NonStickyEventExecutorGroup implements EventExecutorGroup { } @Override - public Future submit(Runnable task) { + public Future submit(Runnable task) { return group.submit(task); } @@ -151,23 +135,23 @@ public final class NonStickyEventExecutorGroup implements EventExecutorGroup { } @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - return group.schedule(command, delay, unit); + public Future schedule(Runnable task, long delay, TimeUnit unit) { + return group.schedule(task, delay, unit); } @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - return group.schedule(callable, delay, unit); + public Future schedule(Callable task, long delay, TimeUnit unit) { + return group.schedule(task, delay, unit); } @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - return group.scheduleAtFixedRate(command, initialDelay, period, unit); + public Future scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { + return group.scheduleAtFixedRate(task, initialDelay, period, unit); } @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - return group.scheduleWithFixedDelay(command, initialDelay, delay, unit); + public Future scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) { + return group.scheduleWithFixedDelay(task, initialDelay, delay, unit); } @Override @@ -186,31 +170,8 @@ public final class NonStickyEventExecutorGroup implements EventExecutorGroup { } @Override - public List> invokeAll( - Collection> tasks) throws InterruptedException { - return group.invokeAll(tasks); - } - - @Override - public List> invokeAll( - Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { - return group.invokeAll(tasks, timeout, unit); - } - - @Override - public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { - return group.invokeAny(tasks); - } - - @Override - public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - return group.invokeAny(tasks, timeout, unit); - } - - @Override - public void execute(Runnable command) { - group.execute(command); + public void execute(Runnable task) { + group.execute(task); } private static final class NonStickyOrderedEventExecutor extends AbstractEventExecutor @@ -294,20 +255,15 @@ public final class NonStickyEventExecutorGroup implements EventExecutorGroup { } @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { return executor.shutdownGracefully(quietPeriod, timeout, unit); } @Override - public Future terminationFuture() { + public Future terminationFuture() { return executor.terminationFuture(); } - @Override - public void shutdown() { - executor.shutdown(); - } - @Override public boolean isShutdown() { return executor.isShutdown(); @@ -324,8 +280,8 @@ public final class NonStickyEventExecutorGroup implements EventExecutorGroup { } @Override - public void execute(Runnable command) { - if (!tasks.offer(command)) { + public void execute(Runnable task) { + if (!tasks.offer(task)) { throw new RejectedExecutionException(); } if (state.compareAndSet(NONE, SUBMITTED)) { @@ -336,25 +292,25 @@ public final class NonStickyEventExecutorGroup implements EventExecutorGroup { } @Override - public ScheduledFuture schedule(Runnable command, long delay, - TimeUnit unit) { + public Future schedule(Runnable task, long delay, + TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + public Future schedule(Callable task, long delay, TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public ScheduledFuture scheduleAtFixedRate( - Runnable command, long initialDelay, long period, TimeUnit unit) { + public Future scheduleAtFixedRate( + Runnable task, long initialDelay, long period, TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public ScheduledFuture scheduleWithFixedDelay( - Runnable command, long initialDelay, long delay, TimeUnit unit) { + public Future scheduleWithFixedDelay( + Runnable task, long initialDelay, long delay, TimeUnit unit) { throw new UnsupportedOperationException(); } } diff --git a/common/src/main/java/io/netty/util/concurrent/PromiseTask.java b/common/src/main/java/io/netty/util/concurrent/PromiseTask.java index 32698bf755..1c88cd7e3d 100644 --- a/common/src/main/java/io/netty/util/concurrent/PromiseTask.java +++ b/common/src/main/java/io/netty/util/concurrent/PromiseTask.java @@ -121,6 +121,11 @@ class PromiseTask extends DefaultPromise implements RunnableFuture { return super.setUncancellable(); } + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return cancel(); + } + @Override protected StringBuilder toStringBuilder() { StringBuilder buf = super.toStringBuilder(); diff --git a/common/src/main/java/io/netty/util/concurrent/RunnableFuture.java b/common/src/main/java/io/netty/util/concurrent/RunnableFuture.java index 93613b551f..ab829256ba 100644 --- a/common/src/main/java/io/netty/util/concurrent/RunnableFuture.java +++ b/common/src/main/java/io/netty/util/concurrent/RunnableFuture.java @@ -18,8 +18,7 @@ package io.netty.util.concurrent; /** * A combination of {@link java.util.concurrent.RunnableFuture} and {@link Future}. */ -@SuppressWarnings("ClassNameSameAsAncestorName") -public interface RunnableFuture extends java.util.concurrent.RunnableFuture, Future { +public interface RunnableFuture extends Runnable, Future { @Override RunnableFuture addListener(FutureListener listener); diff --git a/common/src/main/java/io/netty/util/concurrent/RunnableFutureAdapter.java b/common/src/main/java/io/netty/util/concurrent/RunnableFutureAdapter.java index 9ba77d8c07..4069d8e4cc 100644 --- a/common/src/main/java/io/netty/util/concurrent/RunnableFutureAdapter.java +++ b/common/src/main/java/io/netty/util/concurrent/RunnableFutureAdapter.java @@ -15,8 +15,6 @@ */ package io.netty.util.concurrent; -import static java.util.Objects.requireNonNull; - import io.netty.util.internal.StringUtil; import java.util.concurrent.Callable; @@ -24,6 +22,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static java.util.Objects.requireNonNull; + final class RunnableFutureAdapter implements RunnableFuture { private final Promise promise; @@ -134,14 +134,9 @@ final class RunnableFutureAdapter implements RunnableFuture { } } - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return future.cancel(mayInterruptIfRunning); - } - @Override public boolean cancel() { - return cancel(false); + return future.cancel(); } @Override diff --git a/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFuture.java b/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFuture.java index b286d02a92..5376760992 100644 --- a/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFuture.java +++ b/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFuture.java @@ -16,12 +16,17 @@ package io.netty.util.concurrent; /** - * A combination of {@link java.util.concurrent.RunnableScheduledFuture}, {@link RunnableFuture} and - * {@link ScheduledFuture}. + * A combination of {@link RunnableFuture} and {@link Comparable} (sorting by their next deadline), + * with additional methods for scheduling, periodicity, and delay. */ -@SuppressWarnings("ClassNameSameAsAncestorName") -public interface RunnableScheduledFuture extends - java.util.concurrent.RunnableScheduledFuture, RunnableFuture, ScheduledFuture { +public interface RunnableScheduledFuture extends RunnableFuture, Comparable> { + /** + * Return {@code true} if the task is periodic, which means it may be executed multiple times, as opposed to a + * delayed task or a normal task, that only execute once. + * + * @return {@code true} if this task is periodic, otherwise {@code false}. + */ + boolean isPeriodic(); /** * Returns the deadline in nanos when the {@link #run()} method should be called again. diff --git a/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFutureAdapter.java b/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFutureAdapter.java index 5129ef5a38..2d1cd510f1 100644 --- a/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFutureAdapter.java +++ b/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFutureAdapter.java @@ -16,19 +16,17 @@ package io.netty.util.concurrent; -import static java.util.Objects.requireNonNull; - import io.netty.util.internal.DefaultPriorityQueue; import io.netty.util.internal.StringUtil; import java.util.concurrent.Callable; -import java.util.concurrent.Delayed; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; -@SuppressWarnings("ComparableImplementedButEqualsNotOverridden") +import static java.util.Objects.requireNonNull; + final class RunnableScheduledFutureAdapter implements AbstractScheduledEventExecutor.RunnableScheduledFutureNode { private static final AtomicLong NEXT_TASK_ID = new AtomicLong(); @@ -75,12 +73,7 @@ final class RunnableScheduledFutureAdapter implements AbstractScheduledEventE } @Override - public long getDelay(TimeUnit unit) { - return unit.convert(delayNanos(), TimeUnit.NANOSECONDS); - } - - @Override - public int compareTo(Delayed o) { + public int compareTo(RunnableScheduledFuture o) { if (this == o) { return 0; } @@ -100,6 +93,23 @@ final class RunnableScheduledFutureAdapter implements AbstractScheduledEventE } } + @Override + public int hashCode() { + return Long.hashCode(id); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj instanceof RunnableScheduledFutureAdapter) { + RunnableScheduledFutureAdapter adaptor = (RunnableScheduledFutureAdapter) obj; + return id == adaptor.id; + } + return false; + } + @Override public void run() { try { @@ -130,23 +140,15 @@ final class RunnableScheduledFutureAdapter implements AbstractScheduledEventE } } - /** - * @param mayInterruptIfRunning this value has no effect in this implementation. - */ @Override - public boolean cancel(boolean mayInterruptIfRunning) { - boolean canceled = future.cancel(mayInterruptIfRunning); + public boolean cancel() { + boolean canceled = future.cancel(); if (canceled) { executor.removeScheduled(this); } return canceled; } - @Override - public boolean cancel() { - return cancel(false); - } - @Override public boolean isSuccess() { return promise.isSuccess(); diff --git a/common/src/main/java/io/netty/util/concurrent/ScheduledFuture.java b/common/src/main/java/io/netty/util/concurrent/ScheduledFuture.java deleted file mode 100644 index a6a91f6314..0000000000 --- a/common/src/main/java/io/netty/util/concurrent/ScheduledFuture.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright 2013 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.util.concurrent; - -/** - * The result of an scheduled asynchronous operation. - */ -@SuppressWarnings("ClassNameSameAsAncestorName") -public interface ScheduledFuture extends Future, java.util.concurrent.ScheduledFuture { -} diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index 3bd3515088..7ccd11b118 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -22,21 +22,17 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import java.lang.Thread.State; import java.util.ArrayList; -import java.util.Collection; import java.util.LinkedHashSet; import java.util.List; import java.util.Queue; import java.util.Set; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -95,7 +91,7 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im private volatile long gracefulShutdownTimeout; private long gracefulShutdownStartTime; - private final Promise terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE); + private final Promise terminationFuture = new DefaultPromise<>(GlobalEventExecutor.INSTANCE); /** * Create a new instance @@ -500,7 +496,7 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im } @Override - public final Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public final Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { if (quietPeriod < 0) { throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)"); } @@ -538,7 +534,6 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im } } if (STATE_UPDATER.compareAndSet(this, oldState, newState)) { - //System.err.println(oldState + " " + newState + " " + this); break; } } @@ -560,58 +555,10 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im } @Override - public final Future terminationFuture() { + public final Future terminationFuture() { return terminationFuture.asFuture(); } - @Override - @Deprecated - public final void shutdown() { - if (isShutdown()) { - return; - } - - boolean inEventLoop = inEventLoop(); - boolean wakeup; - int oldState; - for (;;) { - if (isShuttingDown()) { - return; - } - int newState; - wakeup = true; - oldState = state; - if (inEventLoop) { - newState = ST_SHUTDOWN; - } else { - switch (oldState) { - case ST_NOT_STARTED: - case ST_STARTED: - case ST_SHUTTING_DOWN: - newState = ST_SHUTDOWN; - break; - default: - newState = oldState; - wakeup = false; - } - } - if (STATE_UPDATER.compareAndSet(this, oldState, newState)) { - break; - } - } - - if (ensureThreadStarted(oldState)) { - return; - } - - if (wakeup) { - taskQueue.offer(WAKEUP_TASK); - if (!addTaskWakesUp) { - wakeup(inEventLoop); - } - } - } - @Override public final boolean isShuttingDown() { return state >= ST_SHUTTING_DOWN; @@ -732,39 +679,6 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im } } - @Override - public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { - throwIfInEventLoop("invokeAny"); - return super.invokeAny(tasks); - } - - @Override - public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - throwIfInEventLoop("invokeAny"); - return super.invokeAny(tasks, timeout, unit); - } - - @Override - public List> invokeAll(Collection> tasks) - throws InterruptedException { - throwIfInEventLoop("invokeAll"); - return super.invokeAll(tasks); - } - - @Override - public List> invokeAll( - Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { - throwIfInEventLoop("invokeAll"); - return super.invokeAll(tasks, timeout, unit); - } - - private void throwIfInEventLoop(String method) { - if (inEventLoop()) { - throw new RejectedExecutionException("Calling " + method + " from within the EventLoop is not allowed"); - } - } - /** * Returns the {@link ThreadProperties} of the {@link Thread} that powers the {@link SingleThreadEventExecutor}. * If the {@link SingleThreadEventExecutor} is not started yet, this operation will start it and block until diff --git a/common/src/main/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutor.java index bd5eac08f4..4273e06c0d 100644 --- a/common/src/main/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutor.java @@ -18,7 +18,7 @@ package io.netty.util.concurrent; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; -import java.util.List; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.Delayed; import java.util.concurrent.RejectedExecutionHandler; @@ -36,25 +36,28 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS; * * Because it provides no ordering care should be taken when using it! */ -public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolExecutor implements EventExecutor { +@SuppressWarnings("unchecked") +public final class UnorderedThreadPoolEventExecutor implements EventExecutor { private static final InternalLogger logger = InternalLoggerFactory.getInstance( UnorderedThreadPoolEventExecutor.class); - private final Promise terminationFuture = GlobalEventExecutor.INSTANCE.newPromise(); + private final Promise terminationFuture = GlobalEventExecutor.INSTANCE.newPromise(); + private final InnerScheduledThreadPoolExecutor executor; /** * Calls {@link UnorderedThreadPoolEventExecutor#UnorderedThreadPoolEventExecutor(int, ThreadFactory)} * using {@link DefaultThreadFactory}. */ public UnorderedThreadPoolEventExecutor(int corePoolSize) { - this(corePoolSize, new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class)); + DefaultThreadFactory threadFactory = new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class); + executor = new InnerScheduledThreadPoolExecutor(this, corePoolSize, threadFactory); } /** * See {@link ScheduledThreadPoolExecutor#ScheduledThreadPoolExecutor(int, ThreadFactory)} */ public UnorderedThreadPoolEventExecutor(int corePoolSize, ThreadFactory threadFactory) { - super(corePoolSize, threadFactory); + executor = new InnerScheduledThreadPoolExecutor(this, corePoolSize, threadFactory); } /** @@ -62,7 +65,8 @@ public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolE * ThreadFactory, java.util.concurrent.RejectedExecutionHandler)} using {@link DefaultThreadFactory}. */ public UnorderedThreadPoolEventExecutor(int corePoolSize, RejectedExecutionHandler handler) { - this(corePoolSize, new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class), handler); + DefaultThreadFactory threadFactory = new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class); + executor = new InnerScheduledThreadPoolExecutor(this, corePoolSize, threadFactory, handler); } /** @@ -70,7 +74,7 @@ public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolE */ public UnorderedThreadPoolEventExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { - super(corePoolSize, threadFactory, handler); + executor = new InnerScheduledThreadPoolExecutor(this, corePoolSize, threadFactory, handler); } @Override @@ -84,94 +88,97 @@ public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolE } @Override - public List shutdownNow() { - List tasks = super.shutdownNow(); - terminationFuture.trySuccess(null); - return tasks; + public boolean isShutdown() { + return executor.isShutdown(); } @Override - public void shutdown() { - super.shutdown(); - terminationFuture.trySuccess(null); + public boolean isTerminated() { + return executor.isTerminated(); } @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return executor.awaitTermination(timeout, unit); + } + + @Override + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { // TODO: At the moment this just calls shutdown but we may be able to do something more smart here which // respects the quietPeriod and timeout. - shutdown(); + executor.shutdown(); return terminationFuture(); } @Override - public Future terminationFuture() { + public Future terminationFuture() { return terminationFuture.asFuture(); } @Override - protected RunnableScheduledFuture decorateTask(Runnable runnable, RunnableScheduledFuture task) { - return runnable instanceof NonNotifyRunnable ? - task : new RunnableScheduledFutureTask<>(this, runnable, task); + public Future schedule(Runnable task, long delay, TimeUnit unit) { + return (Future) executor.schedule(task, delay, unit); } @Override - protected RunnableScheduledFuture decorateTask(Callable callable, RunnableScheduledFuture task) { - return new RunnableScheduledFutureTask<>(this, callable, task); + public Future schedule(Callable task, long delay, TimeUnit unit) { + return (Future) executor.schedule(task, delay, unit); } @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - return (ScheduledFuture) super.schedule(command, delay, unit); + public Future scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { + return (Future) executor.scheduleAtFixedRate(task, initialDelay, period, unit); } @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - return (ScheduledFuture) super.schedule(callable, delay, unit); + public Future scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) { + return (Future) executor.scheduleWithFixedDelay(task, initialDelay, delay, unit); } @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - return (ScheduledFuture) super.scheduleAtFixedRate(command, initialDelay, period, unit); - } - - @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - return (ScheduledFuture) super.scheduleWithFixedDelay(command, initialDelay, delay, unit); - } - - @Override - public Future submit(Runnable task) { - return (Future) super.submit(task); + public Future submit(Runnable task) { + return (Future) executor.submit(task); } @Override public Future submit(Runnable task, T result) { - return (Future) super.submit(task, result); + return (Future) executor.submit(task, result); } @Override public Future submit(Callable task) { - return (Future) super.submit(task); + return (Future) executor.submit(task); } @Override - public void execute(Runnable command) { - super.schedule(new NonNotifyRunnable(command), 0, NANOSECONDS); + public void execute(Runnable task) { + executor.schedule(new NonNotifyRunnable(task), 0, NANOSECONDS); } + /** + * Return the task queue of the underlying {@link java.util.concurrent.Executor} instance. + *

+ * Visible for testing. + * + * @return The task queue of this executor. + */ + BlockingQueue getQueue() { + return executor.getQueue(); + } + + /** + * Note: this class has a natural ordering that is inconsistent with equals. + */ private static final class RunnableScheduledFutureTask extends PromiseTask - implements RunnableScheduledFuture, ScheduledFuture { + implements RunnableScheduledFuture { private final RunnableScheduledFuture future; - RunnableScheduledFutureTask(EventExecutor executor, Runnable runnable, - RunnableScheduledFuture future) { + RunnableScheduledFutureTask(EventExecutor executor, Runnable runnable, RunnableScheduledFuture future) { super(executor, runnable, null); this.future = future; } - RunnableScheduledFutureTask(EventExecutor executor, Callable callable, - RunnableScheduledFuture future) { + RunnableScheduledFutureTask(EventExecutor executor, Callable callable, RunnableScheduledFuture future) { super(executor, callable); this.future = future; } @@ -228,4 +235,30 @@ public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolE task.run(); } } + + private static final class InnerScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor { + private final EventExecutor eventExecutor; + + InnerScheduledThreadPoolExecutor(EventExecutor eventExecutor, int corePoolSize, ThreadFactory threadFactory) { + super(corePoolSize, threadFactory); + this.eventExecutor = eventExecutor; + } + + InnerScheduledThreadPoolExecutor(EventExecutor eventExecutor, int corePoolSize, ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + super(corePoolSize, threadFactory, handler); + this.eventExecutor = eventExecutor; + } + + @Override + protected RunnableScheduledFuture decorateTask(Runnable runnable, RunnableScheduledFuture task) { + return runnable instanceof NonNotifyRunnable ? + task : new RunnableScheduledFutureTask<>(eventExecutor, runnable, task); + } + + @Override + protected RunnableScheduledFuture decorateTask(Callable callable, RunnableScheduledFuture task) { + return new RunnableScheduledFutureTask<>(eventExecutor, callable, task); + } + } } diff --git a/common/src/test/java/io/netty/util/concurrent/AbstractScheduledEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/AbstractScheduledEventExecutorTest.java index f61ff387ff..a0ae541103 100644 --- a/common/src/test/java/io/netty/util/concurrent/AbstractScheduledEventExecutorTest.java +++ b/common/src/test/java/io/netty/util/concurrent/AbstractScheduledEventExecutorTest.java @@ -35,8 +35,8 @@ public class AbstractScheduledEventExecutorTest { @Test public void testScheduleRunnableZero() { TestScheduledEventExecutor executor = new TestScheduledEventExecutor(); - ScheduledFuture future = executor.schedule(TEST_RUNNABLE, 0, TimeUnit.NANOSECONDS); - assertEquals(0, future.getDelay(TimeUnit.NANOSECONDS)); + Future future = executor.schedule(TEST_RUNNABLE, 0, TimeUnit.NANOSECONDS); + assertEquals(0, getDelay(future)); assertNotNull(executor.pollScheduledTask()); assertNull(executor.pollScheduledTask()); } @@ -44,8 +44,8 @@ public class AbstractScheduledEventExecutorTest { @Test public void testScheduleRunnableNegative() { TestScheduledEventExecutor executor = new TestScheduledEventExecutor(); - ScheduledFuture future = executor.schedule(TEST_RUNNABLE, -1, TimeUnit.NANOSECONDS); - assertEquals(0, future.getDelay(TimeUnit.NANOSECONDS)); + Future future = executor.schedule(TEST_RUNNABLE, -1, TimeUnit.NANOSECONDS); + assertEquals(0, getDelay(future)); assertNotNull(executor.pollScheduledTask()); assertNull(executor.pollScheduledTask()); } @@ -53,8 +53,8 @@ public class AbstractScheduledEventExecutorTest { @Test public void testScheduleCallableZero() { TestScheduledEventExecutor executor = new TestScheduledEventExecutor(); - ScheduledFuture future = executor.schedule(TEST_CALLABLE, 0, TimeUnit.NANOSECONDS); - assertEquals(0, future.getDelay(TimeUnit.NANOSECONDS)); + Future future = executor.schedule(TEST_CALLABLE, 0, TimeUnit.NANOSECONDS); + assertEquals(0, getDelay(future)); assertNotNull(executor.pollScheduledTask()); assertNull(executor.pollScheduledTask()); } @@ -62,12 +62,16 @@ public class AbstractScheduledEventExecutorTest { @Test public void testScheduleCallableNegative() { TestScheduledEventExecutor executor = new TestScheduledEventExecutor(); - ScheduledFuture future = executor.schedule(TEST_CALLABLE, -1, TimeUnit.NANOSECONDS); - assertEquals(0, future.getDelay(TimeUnit.NANOSECONDS)); + Future future = executor.schedule(TEST_CALLABLE, -1, TimeUnit.NANOSECONDS); + assertEquals(0, getDelay(future)); assertNotNull(executor.pollScheduledTask()); assertNull(executor.pollScheduledTask()); } + private static long getDelay(Future future) { + return ((RunnableScheduledFuture) future).delayNanos(); + } + @Test public void testScheduleAtFixedRateRunnableZero() { TestScheduledEventExecutor executor = new TestScheduledEventExecutor(); @@ -113,17 +117,12 @@ public class AbstractScheduledEventExecutorTest { } @Override - public void shutdown() { - // NOOP - } - - @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public Future terminationFuture() { + public Future terminationFuture() { throw new UnsupportedOperationException(); } @@ -143,7 +142,7 @@ public class AbstractScheduledEventExecutorTest { } @Override - public void execute(Runnable command) { + public void execute(Runnable task) { throw new UnsupportedOperationException(); } } diff --git a/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java b/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java index bc276dc988..74aa677e04 100644 --- a/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java +++ b/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java @@ -80,19 +80,15 @@ public class DefaultPromiseTest { } @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { return null; } @Override - public Future terminationFuture() { + public Future terminationFuture() { return null; } - @Override - public void shutdown() { - } - @Override public boolean isShutdown() { return false; @@ -109,23 +105,23 @@ public class DefaultPromiseTest { } @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + public Future schedule(Runnable task, long delay, TimeUnit unit) { return fail("Cannot schedule commands"); } @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + public Future schedule(Callable task, long delay, TimeUnit unit) { return fail("Cannot schedule commands"); } @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + public Future scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { return fail("Cannot schedule commands"); } @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, - TimeUnit unit) { + public Future scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, + TimeUnit unit) { return fail("Cannot schedule commands"); } @@ -135,7 +131,7 @@ public class DefaultPromiseTest { } @Override - public void execute(Runnable command) { + public void execute(Runnable task) { fail("Cannot schedule commands"); } } @@ -145,7 +141,7 @@ public class DefaultPromiseTest { EventExecutor executor = new RejectingEventExecutor(); DefaultPromise promise = new DefaultPromise(executor); - assertTrue(promise.cancel(false)); + assertTrue(promise.cancel()); assertTrue(promise.isCancelled()); } @@ -174,23 +170,33 @@ public class DefaultPromiseTest { @Test public void testCancellationExceptionIsThrownWhenBlockingGet() throws Exception { DefaultPromise promise = new DefaultPromise<>(INSTANCE); - assertTrue(promise.cancel(false)); + assertTrue(promise.cancel()); assertThrows(CancellationException.class, promise::get); } @Test public void testCancellationExceptionIsThrownWhenBlockingGetWithTimeout() throws Exception { DefaultPromise promise = new DefaultPromise<>(INSTANCE); - assertTrue(promise.cancel(false)); + assertTrue(promise.cancel()); assertThrows(CancellationException.class, () -> promise.get(1, TimeUnit.SECONDS)); } @Test public void testCancellationExceptionIsReturnedAsCause() throws Exception { DefaultPromise promise = new DefaultPromise<>(INSTANCE); - assertTrue(promise.cancel(false)); + assertTrue(promise.cancel()); assertThat(promise.cause()).isInstanceOf(CancellationException.class); assertTrue(promise.isFailed()); + assertTrue(promise.isDone()); + } + + @Test + public void uncancellablePromiseIsNotDone() { + DefaultPromise promise = new DefaultPromise<>(INSTANCE); + promise.setUncancellable(); + assertFalse(promise.isDone()); + assertFalse(promise.isCancellable()); + assertFalse(promise.isCancelled()); } @Test @@ -390,6 +396,22 @@ public class DefaultPromiseTest { assertEquals("success", promise.getNow()); } + @Test + public void cancellingUncancellablePromiseDoesNotCompleteIt() { + DefaultPromise promise = new DefaultPromise<>(INSTANCE); + promise.setUncancellable(); + promise.cancel(); + assertFalse(promise.isCancelled()); + assertFalse(promise.isDone()); + assertFalse(promise.isFailed()); + assertFalse(promise.isSuccess()); + promise.setSuccess(null); + assertFalse(promise.isCancelled()); + assertTrue(promise.isDone()); + assertFalse(promise.isFailed()); + assertTrue(promise.isSuccess()); + } + @Test public void throwUncheckedSync() throws InterruptedException { Exception exception = new Exception(); @@ -421,7 +443,7 @@ public class DefaultPromiseTest { @Test public void throwCancelled() throws InterruptedException { DefaultPromise promise = new DefaultPromise<>(INSTANCE); - promise.cancel(true); + promise.cancel(); assertThrows(CancellationException.class, promise::sync); } diff --git a/common/src/test/java/io/netty/util/concurrent/FuturesTest.java b/common/src/test/java/io/netty/util/concurrent/FuturesTest.java index 862e52f213..75109a9115 100644 --- a/common/src/test/java/io/netty/util/concurrent/FuturesTest.java +++ b/common/src/test/java/io/netty/util/concurrent/FuturesTest.java @@ -90,7 +90,7 @@ class FuturesTest { public void cancelOnFutureFromMapMustCancelOriginalFuture() { DefaultPromise promise = new DefaultPromise<>(INSTANCE); Future strFut = promise.map(i -> i.toString()); - strFut.cancel(false); + strFut.cancel(); assertTrue(promise.isCancelled()); assertTrue(strFut.isCancelled()); } @@ -99,7 +99,7 @@ class FuturesTest { public void cancelOnOriginalFutureMustCancelFutureFromMap() { DefaultPromise promise = new DefaultPromise<>(INSTANCE); Future strFut = promise.map(i -> i.toString()); - promise.cancel(false); + promise.cancel(); assertTrue(promise.isCancelled()); assertTrue(strFut.isCancelled()); } @@ -165,7 +165,7 @@ class FuturesTest { public void cancelOnFutureFromFlatMapMustCancelOriginalFuture() { DefaultPromise promise = new DefaultPromise<>(INSTANCE); Future strFut = promise.flatMap(i -> INSTANCE.newSucceededFuture(i.toString())); - strFut.cancel(false); + strFut.cancel(); assertTrue(promise.isCancelled()); assertTrue(strFut.isCancelled()); } @@ -174,7 +174,7 @@ class FuturesTest { public void cancelOnOriginalFutureMustCancelFutureFromFlatMap() { DefaultPromise promise = new DefaultPromise<>(INSTANCE); Future strFut = promise.flatMap(i -> INSTANCE.newSucceededFuture(i.toString())); - promise.cancel(false); + promise.cancel(); assertTrue(promise.isCancelled()); assertTrue(strFut.isCancelled()); } @@ -184,7 +184,7 @@ class FuturesTest { DefaultPromise promise = new DefaultPromise<>(INSTANCE); Future strFut = promise.flatMap(i -> { Future future = new DefaultPromise<>(INSTANCE); - future.cancel(false); + future.cancel(); return future; }); @@ -254,7 +254,7 @@ class FuturesTest { DefaultPromise promise2 = new DefaultPromise<>(executor); promise.cascadeTo(promise2); - assertTrue(promise.cancel(false)); + assertTrue(promise.cancel()); assertTrue(promise.isCancelled()); assertTrue(promise2.await(1, SECONDS)); assertTrue(promise2.isCancelled()); @@ -267,7 +267,7 @@ class FuturesTest { DefaultPromise promise2 = new DefaultPromise<>(executor); promise.cascadeTo(promise2); - assertTrue(promise2.cancel(false)); + assertTrue(promise2.cancel()); assertTrue(promise2.isCancelled()); // diff --git a/common/src/test/java/io/netty/util/concurrent/GlobalEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/GlobalEventExecutorTest.java index 1837bb1c96..78550e0504 100644 --- a/common/src/test/java/io/netty/util/concurrent/GlobalEventExecutorTest.java +++ b/common/src/test/java/io/netty/util/concurrent/GlobalEventExecutorTest.java @@ -38,11 +38,7 @@ public class GlobalEventExecutorTest { @BeforeEach public void setUp() throws Exception { // Wait until the global executor is stopped (just in case there is a task running due to previous test cases) - for (;;) { - if (e.thread == null || !e.thread.isAlive()) { - break; - } - + while (e.thread != null && e.thread.isAlive()) { Thread.sleep(50); } } @@ -76,7 +72,7 @@ public class GlobalEventExecutorTest { @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS) public void testScheduledTasks() throws Exception { TestRunnable task = new TestRunnable(0); - ScheduledFuture f = e.schedule(task, 1500, TimeUnit.MILLISECONDS); + Future f = e.schedule(task, 1500, TimeUnit.MILLISECONDS); f.sync(); assertThat(task.ran.get(), is(true)); @@ -115,7 +111,7 @@ public class GlobalEventExecutorTest { //add scheduled task TestRunnable scheduledTask = new TestRunnable(0); - ScheduledFuture f = e.schedule(scheduledTask , 1500, TimeUnit.MILLISECONDS); + Future f = e.schedule(scheduledTask , 1500, TimeUnit.MILLISECONDS); //add task TestRunnable afterTask = new TestRunnable(0); @@ -134,7 +130,7 @@ public class GlobalEventExecutorTest { //for https://github.com/netty/netty/issues/1614 //add scheduled task TestRunnable t = new TestRunnable(0); - final ScheduledFuture f = e.schedule(t, 1500, TimeUnit.MILLISECONDS); + Future f = e.schedule(t, 1500, TimeUnit.MILLISECONDS); //ensure always has at least one task in taskQueue //check if scheduled tasks are triggered diff --git a/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java index ddfe32587b..379990626d 100644 --- a/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java +++ b/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java @@ -17,10 +17,7 @@ package io.netty.util.concurrent; import org.junit.jupiter.api.Test; -import java.util.Collections; import java.util.Queue; -import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -99,82 +96,6 @@ public class SingleThreadEventExecutorTest { executor.shutdownGracefully(); } - @Test - public void testInvokeAnyInEventLoop() throws Throwable { - assertTimeoutPreemptively(ofSeconds(3), () -> { - var exception = assertThrows(CompletionException.class, () -> testInvokeInEventLoop(true, false)); - assertThat(exception).hasCauseInstanceOf(RejectedExecutionException.class); - }); - } - - @Test - public void testInvokeAnyInEventLoopWithTimeout() throws Throwable { - assertTimeoutPreemptively(ofSeconds(3), () -> { - var exception = assertThrows(CompletionException.class, () -> testInvokeInEventLoop(true, true)); - assertThat(exception).hasCauseInstanceOf(RejectedExecutionException.class); - }); - } - - @Test - public void testInvokeAllInEventLoop() throws Throwable { - assertTimeoutPreemptively(ofSeconds(3), () -> { - var exception = assertThrows(CompletionException.class, () -> testInvokeInEventLoop(false, false)); - assertThat(exception).hasCauseInstanceOf(RejectedExecutionException.class); - }); - } - - @Test - public void testInvokeAllInEventLoopWithTimeout() throws Throwable { - assertTimeoutPreemptively(ofSeconds(3), () -> { - var exception = assertThrows(CompletionException.class, () -> testInvokeInEventLoop(false, true)); - assertThat(exception).hasCauseInstanceOf(RejectedExecutionException.class); - }); - } - - private static void testInvokeInEventLoop(final boolean any, final boolean timeout) { - final SingleThreadEventExecutor executor = new SingleThreadEventExecutor(Executors.defaultThreadFactory()) { - @Override - protected void run() { - while (!confirmShutdown()) { - Runnable task = takeTask(); - if (task != null) { - task.run(); - } - } - } - }; - try { - final Promise promise = executor.newPromise(); - executor.execute(() -> { - try { - Set> set = Collections.singleton(() -> { - promise.setFailure(new AssertionError("Should never execute the Callable")); - return Boolean.TRUE; - }); - if (any) { - if (timeout) { - executor.invokeAny(set, 10, TimeUnit.SECONDS); - } else { - executor.invokeAny(set); - } - } else { - if (timeout) { - executor.invokeAll(set, 10, TimeUnit.SECONDS); - } else { - executor.invokeAll(set); - } - } - promise.setFailure(new AssertionError("Should never reach here")); - } catch (Throwable cause) { - promise.setFailure(cause); - } - }); - promise.asFuture().syncUninterruptibly(); - } finally { - executor.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); - } - } - @Test public void testTaskAddedAfterShutdownNotAbandoned() throws Exception { @@ -266,7 +187,7 @@ public class SingleThreadEventExecutorTest { //add scheduled task TestRunnable scheduledTask = new TestRunnable(); - ScheduledFuture f = executor.schedule(scheduledTask , 1500, TimeUnit.MILLISECONDS); + Future f = executor.schedule(scheduledTask , 1500, TimeUnit.MILLISECONDS); //add task TestRunnable afterTask = new TestRunnable(); @@ -300,7 +221,7 @@ public class SingleThreadEventExecutorTest { assertTimeoutPreemptively(ofSeconds(5), () -> { //add scheduled task TestRunnable t = new TestRunnable(); - final ScheduledFuture f = executor.schedule(t, 1500, TimeUnit.MILLISECONDS); + Future f = executor.schedule(t, 1500, TimeUnit.MILLISECONDS); //ensure always has at least one task in taskQueue //check if scheduled tasks are triggered diff --git a/common/src/test/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutorTest.java index aff141610e..caeabdfbe5 100644 --- a/common/src/test/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutorTest.java +++ b/common/src/test/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutorTest.java @@ -65,7 +65,7 @@ public class UnorderedThreadPoolEventExecutorTest { try { latch.await(); } finally { - future.cancel(true); + future.cancel(); executor.shutdownGracefully(); } } @@ -105,4 +105,41 @@ public class UnorderedThreadPoolEventExecutorTest { executor.shutdownGracefully(); } } + + @Test + public void futuresMustHaveCorrectExecutor() { + UnorderedThreadPoolEventExecutor executor = new UnorderedThreadPoolEventExecutor(1); + Runnable runnable = () -> { + }; + Callable callable = () -> null; + Future future = null; + + try { + future = executor.schedule(runnable, 0, TimeUnit.MILLISECONDS); + assertSame(executor, future.executor()); + + future.cancel(); + future = executor.schedule(callable, 0, TimeUnit.MILLISECONDS); + assertSame(executor, future.executor()); + + future.cancel(); + future = executor.scheduleAtFixedRate(runnable, 0, 1, TimeUnit.MILLISECONDS); + assertSame(executor, future.executor()); + + future.cancel(); + future = executor.scheduleWithFixedDelay(runnable, 0, 1, TimeUnit.MILLISECONDS); + assertSame(executor, future.executor()); + + future.cancel(); + future = executor.submit(runnable); + assertSame(executor, future.executor()); + + future.cancel(); + future = executor.submit(callable); + assertSame(executor, future.executor()); + } finally { + future.cancel(); + executor.shutdownGracefully(); + } + } } diff --git a/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyHandler.java b/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyHandler.java index e915577f98..82f2fc7676 100644 --- a/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyHandler.java +++ b/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyHandler.java @@ -348,7 +348,7 @@ public abstract class ProxyHandler implements ChannelHandler { private void cancelConnectTimeoutFuture() { if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); + connectTimeoutFuture.cancel(); connectTimeoutFuture = null; } } diff --git a/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java b/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java index 62bb9e7e69..aaea42c3a8 100644 --- a/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java +++ b/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java @@ -205,7 +205,7 @@ public class FlushConsolidationHandler implements ChannelHandler { private void cancelScheduledFlush() { if (nextScheduledFlush != null) { - nextScheduledFlush.cancel(false); + nextScheduledFlush.cancel(); nextScheduledFlush = null; } } diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index c26c596f72..35551c3b73 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -2038,7 +2038,7 @@ public class SslHandler extends ByteToMessageDecoder { }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS); // Cancel the handshake timeout when handshake is finished. - localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel(false)); + localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel()); } private void forceFlush(ChannelHandlerContext ctx) { @@ -2088,7 +2088,7 @@ public class SslHandler extends ByteToMessageDecoder { // Close the connection if close_notify is sent in time. flushFuture.addListener(f -> { if (timeoutFuture != null) { - timeoutFuture.cancel(false); + timeoutFuture.cancel(); } final long closeNotifyReadTimeout = closeNotifyReadTimeoutMillis; if (closeNotifyReadTimeout <= 0) { @@ -2122,7 +2122,7 @@ public class SslHandler extends ByteToMessageDecoder { // Do the close once we received the close_notify. closeFuture.addListener(future -> { if (closeNotifyReadTimeoutFuture != null) { - closeNotifyReadTimeoutFuture.cancel(false); + closeNotifyReadTimeoutFuture.cancel(); } if (ctx.channel().isActive()) { addCloseListener(ctx.close(), promise); diff --git a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java index 258c5525bf..7ce25394ca 100644 --- a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java @@ -348,15 +348,15 @@ public class IdleStateHandler implements ChannelHandler { state = 2; if (readerIdleTimeout != null) { - readerIdleTimeout.cancel(false); + readerIdleTimeout.cancel(); readerIdleTimeout = null; } if (writerIdleTimeout != null) { - writerIdleTimeout.cancel(false); + writerIdleTimeout.cancel(); writerIdleTimeout = null; } if (allIdleTimeout != null) { - allIdleTimeout.cancel(false); + allIdleTimeout.cancel(); allIdleTimeout = null; } } diff --git a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java index e5131bbc92..d8ceaaa599 100644 --- a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java @@ -117,7 +117,7 @@ public class WriteTimeoutHandler implements ChannelHandler { lastTask = null; while (task != null) { assert task.ctx.executor().inEventLoop(); - task.scheduledFuture.cancel(false); + task.scheduledFuture.cancel(); WriteTimeoutTask prev = task.prev; task.prev = null; task.next = null; @@ -214,7 +214,7 @@ public class WriteTimeoutHandler implements ChannelHandler { @Override public void operationComplete(Future future) throws Exception { // scheduledFuture has already be set when reaching here - scheduledFuture.cancel(false); + scheduledFuture.cancel(); // Check if its safe to modify the "doubly-linked-list" that we maintain. If its not we will schedule the // modification so its picked up by the executor.. diff --git a/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficCounter.java b/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficCounter.java index aa0ec57808..0915b6f373 100644 --- a/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficCounter.java +++ b/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficCounter.java @@ -15,13 +15,13 @@ */ package io.netty.handler.traffic; -import static io.netty.util.internal.ObjectUtil.checkNotNullWithIAE; - import io.netty.handler.traffic.GlobalChannelTrafficShapingHandler.PerChannel; +import io.netty.util.concurrent.EventExecutorGroup; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static io.netty.util.internal.ObjectUtil.checkNotNullWithIAE; + /** * Version for {@link GlobalChannelTrafficShapingHandler}. * This TrafficCounter is the Global one, and its special property is to directly handle @@ -36,7 +36,7 @@ public class GlobalChannelTrafficCounter extends TrafficCounter { * @param checkInterval the checkInterval in millisecond between two computations. */ public GlobalChannelTrafficCounter(GlobalChannelTrafficShapingHandler trafficShapingHandler, - ScheduledExecutorService executor, String name, long checkInterval) { + EventExecutorGroup executor, String name, long checkInterval) { super(trafficShapingHandler, executor, name, checkInterval); checkNotNullWithIAE(executor, "executor"); } @@ -111,7 +111,7 @@ public class GlobalChannelTrafficCounter extends TrafficCounter { resetAccounting(milliSecondFromNano()); trafficShapingHandler.doAccounting(this); if (scheduledFuture != null) { - scheduledFuture.cancel(true); + scheduledFuture.cancel(); } } diff --git a/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficShapingHandler.java index 8a3de8743c..faa780845c 100644 --- a/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficShapingHandler.java +++ b/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficShapingHandler.java @@ -22,6 +22,7 @@ import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.util.Attribute; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import io.netty.util.internal.logging.InternalLogger; @@ -33,7 +34,6 @@ import java.util.Collection; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -149,7 +149,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa /** * Create the global TrafficCounter */ - void createGlobalTrafficCounter(ScheduledExecutorService executor) { + void createGlobalTrafficCounter(EventExecutorGroup executor) { // Default setMaxDeviation(DEFAULT_DEVIATION, DEFAULT_SLOWDOWN, DEFAULT_ACCELERATION); checkNotNullWithIAE(executor, "executor"); @@ -167,7 +167,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa * Create a new instance. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}. * @param writeGlobalLimit * 0 or a limit in bytes/s * @param readGlobalLimit @@ -182,7 +182,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa * @param maxTime * The maximum delay to wait in case of traffic excess. */ - public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, + public GlobalChannelTrafficShapingHandler(EventExecutorGroup executor, long writeGlobalLimit, long readGlobalLimit, long writeChannelLimit, long readChannelLimit, long checkInterval, long maxTime) { @@ -196,7 +196,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa * Create a new instance. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}. * @param writeGlobalLimit * 0 or a limit in bytes/s * @param readGlobalLimit @@ -209,7 +209,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa * The delay between two computations of performances for * channels or 0 if no stats are to be computed. */ - public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, + public GlobalChannelTrafficShapingHandler(EventExecutorGroup executor, long writeGlobalLimit, long readGlobalLimit, long writeChannelLimit, long readChannelLimit, long checkInterval) { @@ -223,7 +223,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa * Create a new instance. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}. * @param writeGlobalLimit * 0 or a limit in bytes/s * @param readGlobalLimit @@ -233,7 +233,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa * @param readChannelLimit * 0 or a limit in bytes/s */ - public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, + public GlobalChannelTrafficShapingHandler(EventExecutorGroup executor, long writeGlobalLimit, long readGlobalLimit, long writeChannelLimit, long readChannelLimit) { super(writeGlobalLimit, readGlobalLimit); @@ -246,12 +246,12 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa * Create a new instance. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}. * @param checkInterval * The delay between two computations of performances for * channels or 0 if no stats are to be computed. */ - public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, long checkInterval) { + public GlobalChannelTrafficShapingHandler(EventExecutorGroup executor, long checkInterval) { super(checkInterval); createGlobalTrafficCounter(executor); } @@ -260,9 +260,9 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa * Create a new instance. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}. */ - public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor) { + public GlobalChannelTrafficShapingHandler(EventExecutorGroup executor) { createGlobalTrafficCounter(executor); } diff --git a/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java index 89a58ac011..fbd16c8c48 100644 --- a/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java +++ b/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java @@ -20,6 +20,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Promise; import java.util.ArrayDeque; @@ -104,7 +105,7 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler { /** * Create the global TrafficCounter. */ - void createGlobalTrafficCounter(ScheduledExecutorService executor) { + void createGlobalTrafficCounter(EventExecutorGroup executor) { requireNonNull(executor, "executor"); TrafficCounter tc = new TrafficCounter(this, executor, "GlobalTC", checkInterval); setTrafficCounter(tc); @@ -120,7 +121,7 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler { * Create a new instance. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}. * @param writeLimit * 0 or a limit in bytes/s * @param readLimit @@ -131,7 +132,7 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler { * @param maxTime * The maximum delay to wait in case of traffic excess. */ - public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, long readLimit, + public GlobalTrafficShapingHandler(EventExecutorGroup executor, long writeLimit, long readLimit, long checkInterval, long maxTime) { super(writeLimit, readLimit, checkInterval, maxTime); createGlobalTrafficCounter(executor); @@ -142,7 +143,7 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler { * default max time as delay allowed value of 15000 ms. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}. * @param writeLimit * 0 or a limit in bytes/s * @param readLimit @@ -151,8 +152,8 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler { * The delay between two computations of performances for * channels or 0 if no stats are to be computed. */ - public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, - long readLimit, long checkInterval) { + public GlobalTrafficShapingHandler(EventExecutorGroup executor, long writeLimit, + long readLimit, long checkInterval) { super(writeLimit, readLimit, checkInterval); createGlobalTrafficCounter(executor); } @@ -162,13 +163,13 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler { * default max time as delay allowed value of 15000 ms. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}. * @param writeLimit * 0 or a limit in bytes/s * @param readLimit * 0 or a limit in bytes/s */ - public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, + public GlobalTrafficShapingHandler(EventExecutorGroup executor, long writeLimit, long readLimit) { super(writeLimit, readLimit); createGlobalTrafficCounter(executor); @@ -179,12 +180,12 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler { * default max time as delay allowed value of 15000 ms and no limit. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}. * @param checkInterval * The delay between two computations of performances for * channels or 0 if no stats are to be computed. */ - public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long checkInterval) { + public GlobalTrafficShapingHandler(EventExecutorGroup executor, long checkInterval) { super(checkInterval); createGlobalTrafficCounter(executor); } diff --git a/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java b/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java index 283e01166e..fd384e2de2 100644 --- a/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java +++ b/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java @@ -15,11 +15,13 @@ */ package io.netty.handler.traffic; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.Future; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -148,7 +150,7 @@ public class TrafficCounter { /** * Executor that will run the monitor */ - final ScheduledExecutorService executor; + final EventExecutorGroup executor; /** * Monitor created once in start() */ @@ -156,7 +158,7 @@ public class TrafficCounter { /** * used in stop() to cancel the timer */ - volatile ScheduledFuture scheduledFuture; + volatile Future scheduledFuture; /** * Is Monitor active @@ -211,7 +213,7 @@ public class TrafficCounter { trafficShapingHandler.doAccounting(this); } if (scheduledFuture != null) { - scheduledFuture.cancel(true); + scheduledFuture.cancel(); } } @@ -252,7 +254,7 @@ public class TrafficCounter { * @param checkInterval * the checkInterval in millisecond between two computations. */ - public TrafficCounter(ScheduledExecutorService executor, String name, long checkInterval) { + public TrafficCounter(EventExecutor executor, String name, long checkInterval) { requireNonNull(name, "name"); trafficShapingHandler = null; @@ -277,7 +279,7 @@ public class TrafficCounter { * the checkInterval in millisecond between two computations. */ public TrafficCounter( - AbstractTrafficShapingHandler trafficShapingHandler, ScheduledExecutorService executor, + AbstractTrafficShapingHandler trafficShapingHandler, EventExecutorGroup executor, String name, long checkInterval) { this.name = requireNonNull(name, "name"); this.trafficShapingHandler = checkNotNullWithIAE(trafficShapingHandler, "trafficShapingHandler"); @@ -304,13 +306,12 @@ public class TrafficCounter { public void configure(long newCheckInterval) { long newInterval = newCheckInterval / 10 * 10; if (checkInterval.getAndSet(newInterval) != newInterval) { + stop(); if (newInterval <= 0) { - stop(); // No more active monitoring lastTime.set(milliSecondFromNano()); } else { // Restart - stop(); start(); } } diff --git a/handler/src/test/java/io/netty/handler/traffic/TrafficShapingHandlerTest.java b/handler/src/test/java/io/netty/handler/traffic/TrafficShapingHandlerTest.java index 22a57ddef1..80dff79e5c 100644 --- a/handler/src/test/java/io/netty/handler/traffic/TrafficShapingHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/traffic/TrafficShapingHandlerTest.java @@ -30,11 +30,12 @@ import io.netty.channel.local.LocalHandler; import io.netty.channel.local.LocalServerChannel; import io.netty.util.Attribute; import io.netty.util.CharsetUtil; +import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.SingleThreadEventExecutor; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; @@ -42,14 +43,14 @@ import static org.junit.jupiter.api.Assertions.assertNull; public class TrafficShapingHandlerTest { private static final long READ_LIMIT_BYTES_PER_SECOND = 1; - private static final ScheduledExecutorService SES = Executors.newSingleThreadScheduledExecutor(); + private static final EventExecutorGroup SES = new SingleThreadEventExecutor(); private static final MultithreadEventLoopGroup GROUP = new MultithreadEventLoopGroup(1, LocalHandler.newFactory()); @AfterAll public static void destroy() { GROUP.shutdownGracefully(); - SES.shutdown(); + SES.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); } @Test diff --git a/microbench/src/main/java/io/netty/microbench/channel/epoll/EpollSocketChannelBenchmark.java b/microbench/src/main/java/io/netty/microbench/channel/epoll/EpollSocketChannelBenchmark.java index 36f3ee9e0d..6d5b9223d9 100644 --- a/microbench/src/main/java/io/netty/microbench/channel/epoll/EpollSocketChannelBenchmark.java +++ b/microbench/src/main/java/io/netty/microbench/channel/epoll/EpollSocketChannelBenchmark.java @@ -126,7 +126,7 @@ public class EpollSocketChannelBenchmark extends AbstractMicrobenchmark { public void tearDown() throws Exception { chan.close().sync(); serverChan.close().sync(); - future.cancel(true); + future.cancel(); group.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync(); abyte.release(); } diff --git a/microbench/src/main/java/io/netty/microbench/concurrent/BurstCostExecutorsBenchmark.java b/microbench/src/main/java/io/netty/microbench/concurrent/BurstCostExecutorsBenchmark.java index c5ac0dbf81..0bbfdf5b73 100644 --- a/microbench/src/main/java/io/netty/microbench/concurrent/BurstCostExecutorsBenchmark.java +++ b/microbench/src/main/java/io/netty/microbench/concurrent/BurstCostExecutorsBenchmark.java @@ -23,8 +23,14 @@ import io.netty.channel.kqueue.KQueueHandler; import io.netty.channel.nio.NioHandler; import io.netty.microbench.util.AbstractMicrobenchmark; import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.ImmediateEventExecutor; +import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.RejectedExecutionHandlers; import io.netty.util.concurrent.SingleThreadEventExecutor; +import io.netty.util.concurrent.UnorderedThreadPoolEventExecutor; import io.netty.util.internal.PlatformDependent; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -38,17 +44,12 @@ import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.infra.Blackhole; -import java.util.Collection; -import java.util.List; +import java.util.Collections; +import java.util.Iterator; import java.util.Queue; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -60,18 +61,17 @@ public class BurstCostExecutorsBenchmark extends AbstractMicrobenchmark { * This executor is useful as the best burst latency performer because it won't go to sleep and won't be hit by the * cost of being awaken on both offer/consumer side. */ - private static final class SpinExecutorService implements ExecutorService { - + private static final class SpinExecutorService implements EventExecutorGroup { private static final Runnable POISON_PILL = () -> { }; private final Queue tasks; private final AtomicBoolean poisoned = new AtomicBoolean(); private final Thread executorThread; + private final Promise terminationFuture = ImmediateEventExecutor.INSTANCE.newPromise(); SpinExecutorService(int maxTasks) { tasks = PlatformDependent.newFixedMpscQueue(maxTasks); executorThread = new Thread(() -> { - final Queue tasks = SpinExecutorService.this.tasks; Runnable task; while ((task = tasks.poll()) != POISON_PILL) { if (task != null) { @@ -83,22 +83,8 @@ public class BurstCostExecutorsBenchmark extends AbstractMicrobenchmark { } @Override - public void shutdown() { - if (poisoned.compareAndSet(false, true)) { - while (!tasks.offer(POISON_PILL)) { - // Just try again - } - try { - executorThread.join(); - } catch (InterruptedException e) { - //We're quite trusty :) - } - } - } - - @Override - public List shutdownNow() { - throw new UnsupportedOperationException(); + public boolean isShuttingDown() { + return poisoned.get(); } @Override @@ -121,46 +107,79 @@ public class BurstCostExecutorsBenchmark extends AbstractMicrobenchmark { throw new UnsupportedOperationException(); } + @Override + public Future schedule(Runnable task, long delay, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + @Override + public Future schedule(Callable task, long delay, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + @Override + public Future scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + @Override + public Future scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + @Override public Future submit(Runnable task, T result) { throw new UnsupportedOperationException(); } @Override - public Future submit(Runnable task) { + public Future submit(Runnable task) { throw new UnsupportedOperationException(); } @Override - public List> invokeAll(Collection> tasks) throws InterruptedException { - throw new UnsupportedOperationException(); - } - - @Override - public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException { - throw new UnsupportedOperationException(); - } - - @Override - public T invokeAny(Collection> tasks) - throws InterruptedException, ExecutionException { - throw new UnsupportedOperationException(); - } - - @Override - public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - throw new UnsupportedOperationException(); - } - - @Override - public void execute(Runnable command) { - if (!tasks.offer(command)) { + public void execute(Runnable task) { + if (!tasks.offer(task)) { throw new RejectedExecutionException( "If that happens, there is something wrong with the available capacity/burst size"); } } + + @Override + public Future shutdownGracefully() { + if (poisoned.compareAndSet(false, true)) { + while (!tasks.offer(POISON_PILL)) { + // Just try again + } + try { + executorThread.join(); + } catch (InterruptedException e) { + //We're quite trusty :) + } + } + terminationFuture.trySuccess(null); + return terminationFuture.asFuture(); + } + + @Override + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + return shutdownGracefully(); + } + + @Override + public Future terminationFuture() { + return terminationFuture.asFuture(); + } + + @Override + public EventExecutor next() { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator iterator() { + return Collections.emptyIterator(); + } } private enum ExecutorType { @@ -179,8 +198,8 @@ public class BurstCostExecutorsBenchmark extends AbstractMicrobenchmark { @Param({ "0", "10" }) private int work; - private ExecutorService executor; - private ExecutorService executorToShutdown; + private EventExecutorGroup executor; + private EventExecutorGroup executorToShutdown; @Setup public void setup() { @@ -199,7 +218,7 @@ public class BurstCostExecutorsBenchmark extends AbstractMicrobenchmark { executorToShutdown = executor; break; case juc: - executor = Executors.newSingleThreadScheduledExecutor(); + executor = new UnorderedThreadPoolEventExecutor(1); executorToShutdown = executor; break; case nioEventLoop: @@ -230,7 +249,7 @@ public class BurstCostExecutorsBenchmark extends AbstractMicrobenchmark { @TearDown public void tearDown() { - executorToShutdown.shutdown(); + executorToShutdown.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); } @State(Scope.Thread) @@ -255,7 +274,7 @@ public class BurstCostExecutorsBenchmark extends AbstractMicrobenchmark { //benchmark is focusing on executors with a single threaded consumer: //it would reduce the cost on consumer side while allowing to focus just //to the threads hand-off/wake-up cost - DONE_UPDATER.lazySet(PerThreadState.this, completed + 1); + DONE_UPDATER.lazySet(this, completed + 1); }; } else { completeTask = () -> { @@ -263,7 +282,7 @@ public class BurstCostExecutorsBenchmark extends AbstractMicrobenchmark { //benchmark is focusing on executors with a single threaded consumer: //it would reduce the cost on consumer side while allowing to focus just //to the threads hand-off/wake-up cost - DONE_UPDATER.lazySet(PerThreadState.this, completed + 1); + DONE_UPDATER.lazySet(this, completed + 1); }; } } @@ -283,7 +302,7 @@ public class BurstCostExecutorsBenchmark extends AbstractMicrobenchmark { */ public int spinWaitCompletionOf(int value) { while (true) { - final int lastRead = this.completed; + final int lastRead = completed; if (lastRead >= value) { return lastRead; } @@ -313,7 +332,7 @@ public class BurstCostExecutorsBenchmark extends AbstractMicrobenchmark { } private int executeBurst(final PerThreadState state) { - final ExecutorService executor = this.executor; + final EventExecutorGroup executor = this.executor; final int burstLength = this.burstLength; final Runnable completeTask = state.completeTask; for (int i = 0; i < burstLength; i++) { diff --git a/microbench/src/main/java/io/netty/microbench/concurrent/RunnableScheduledFutureAdapterBenchmark.java b/microbench/src/main/java/io/netty/microbench/concurrent/RunnableScheduledFutureAdapterBenchmark.java index 8035bfc663..7e0ff6cd32 100644 --- a/microbench/src/main/java/io/netty/microbench/concurrent/RunnableScheduledFutureAdapterBenchmark.java +++ b/microbench/src/main/java/io/netty/microbench/concurrent/RunnableScheduledFutureAdapterBenchmark.java @@ -68,7 +68,7 @@ public class RunnableScheduledFutureAdapterBenchmark extends AbstractMicrobenchm public Future cancelInOrder(final FuturesHolder futuresHolder) { return executor.submit(() -> { for (int i = 0; i < futuresHolder.num; i++) { - futuresHolder.futures.get(i).cancel(false); + futuresHolder.futures.get(i).cancel(); } }).syncUninterruptibly(); } @@ -77,7 +77,7 @@ public class RunnableScheduledFutureAdapterBenchmark extends AbstractMicrobenchm public Future cancelInReverseOrder(final FuturesHolder futuresHolder) { return executor.submit(() -> { for (int i = futuresHolder.num - 1; i >= 0; i--) { - futuresHolder.futures.get(i).cancel(false); + futuresHolder.futures.get(i).cancel(); } }).syncUninterruptibly(); } diff --git a/microbench/src/main/java/io/netty/microbench/util/AbstractSharedExecutorMicrobenchmark.java b/microbench/src/main/java/io/netty/microbench/util/AbstractSharedExecutorMicrobenchmark.java index b6ea697f57..d88680d90a 100644 --- a/microbench/src/main/java/io/netty/microbench/util/AbstractSharedExecutorMicrobenchmark.java +++ b/microbench/src/main/java/io/netty/microbench/util/AbstractSharedExecutorMicrobenchmark.java @@ -19,15 +19,13 @@ import io.netty.channel.EventLoop; import io.netty.util.concurrent.AbstractEventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.ScheduledFuture; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; +import org.openjdk.jmh.annotations.Fork; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; -import org.openjdk.jmh.annotations.Fork; - /** * This harness facilitates the sharing of an executor between JMH and Netty and * thus avoid measuring context switching in microbenchmarks. @@ -87,21 +85,15 @@ public class AbstractSharedExecutorMicrobenchmark extends AbstractMicrobenchmark } @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { return executor.shutdownGracefully(quietPeriod, timeout, unit); } @Override - public Future terminationFuture() { + public Future terminationFuture() { return executor.terminationFuture(); } - @Override - @Deprecated - public void shutdown() { - executor.shutdown(); - } - @Override public boolean isShuttingDown() { return executor.isShuttingDown(); @@ -128,8 +120,8 @@ public class AbstractSharedExecutorMicrobenchmark extends AbstractMicrobenchmark } @Override - public void execute(Runnable command) { - executor.execute(command); + public void execute(Runnable task) { + executor.execute(task); } @Override @@ -138,24 +130,24 @@ public class AbstractSharedExecutorMicrobenchmark extends AbstractMicrobenchmark } @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - return executor.schedule(command, delay, unit); + public Future schedule(Runnable task, long delay, TimeUnit unit) { + return executor.schedule(task, delay, unit); } @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - return executor.schedule(callable, delay, unit); + public Future schedule(Callable task, long delay, TimeUnit unit) { + return executor.schedule(task, delay, unit); } @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - return executor.scheduleAtFixedRate(command, initialDelay, period, unit); + public Future scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { + return executor.scheduleAtFixedRate(task, initialDelay, period, unit); } @Override - public ScheduledFuture scheduleWithFixedDelay( - Runnable command, long initialDelay, long delay, TimeUnit unit) { - return executor.scheduleWithFixedDelay(command, initialDelay, delay, unit); + public Future scheduleWithFixedDelay( + Runnable task, long initialDelay, long delay, TimeUnit unit) { + return executor.scheduleWithFixedDelay(task, initialDelay, delay, unit); } } diff --git a/resolver-dns/src/main/java/io/netty/resolver/dns/Cache.java b/resolver-dns/src/main/java/io/netty/resolver/dns/Cache.java index 515a0e7369..25ab8dcf6b 100644 --- a/resolver-dns/src/main/java/io/netty/resolver/dns/Cache.java +++ b/resolver-dns/src/main/java/io/netty/resolver/dns/Cache.java @@ -53,7 +53,7 @@ abstract class Cache { private static final Future CANCELLED_FUTURE = new Future() { @Override public boolean cancel() { - return cancel(false); + return false; } @Override @@ -131,11 +131,6 @@ abstract class Cache { return null; } - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return false; - } - @Override public EventExecutor executor() { throw new UnsupportedOperationException(); @@ -352,7 +347,7 @@ abstract class Cache { break; } else { // There was something else scheduled in the meantime... Cancel and try again. - newFuture.cancel(true); + newFuture.cancel(); } } else { break; @@ -401,7 +396,7 @@ abstract class Cache { } void cancel() { - future.cancel(false); + future.cancel(); } @Override diff --git a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java index 68c48ead87..1fc45d3059 100644 --- a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java +++ b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java @@ -217,7 +217,7 @@ abstract class DnsQueryContext implements FutureListener timeoutFuture = this.timeoutFuture; if (timeoutFuture != null) { this.timeoutFuture = null; - timeoutFuture.cancel(false); + timeoutFuture.cancel(); } // Remove the id from the manager as soon as the query completes. This may be because of success, failure or diff --git a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsResolveContext.java b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsResolveContext.java index 2bbb85bada..26166f784c 100644 --- a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsResolveContext.java +++ b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsResolveContext.java @@ -987,7 +987,7 @@ abstract class DnsResolveContext { Future> f = i.next(); i.remove(); - f.cancel(false); + f.cancel(); } } diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java index 08d46affdd..debe7b9491 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java @@ -41,11 +41,10 @@ import static org.junit.jupiter.api.Assertions.fail; public abstract class AbstractSingleThreadEventLoopTest { @Test - @SuppressWarnings("deprecation") public void shutdownBeforeStart() throws Exception { EventLoopGroup group = new MultithreadEventLoopGroup(newIoHandlerFactory()); assertFalse(group.awaitTermination(2, TimeUnit.MILLISECONDS)); - group.shutdown(); + group.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); assertTrue(group.awaitTermination(200, TimeUnit.MILLISECONDS)); } diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketCancelWriteTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketCancelWriteTest.java index 83867b3185..ff711061f7 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketCancelWriteTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketCancelWriteTest.java @@ -58,11 +58,11 @@ public class SocketCancelWriteTest extends AbstractSocketTest { Channel cc = cb.connect(sc.localAddress()).get(); Future f = cc.write(a); - assertTrue(f.cancel(false)); + assertTrue(f.cancel()); cc.writeAndFlush(b); cc.write(c); Future f2 = cc.write(d); - assertTrue(f2.cancel(false)); + assertTrue(f2.cancel()); cc.writeAndFlush(e); while (sh.counter < 3) { diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectionAttemptTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectionAttemptTest.java index de24fc41e3..6897266060 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectionAttemptTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectionAttemptTest.java @@ -137,7 +137,7 @@ public class SocketConnectionAttemptTest extends AbstractClientSocketTest { } } - if (future.cancel(true)) { + if (future.cancel()) { assertThat(future.isCancelled()).isTrue(); } else { // Cancellation not supported by the transport. diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index da160a626f..722e7aec11 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -166,7 +166,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann Future future = connectTimeoutFuture; if (future != null) { - future.cancel(false); + future.cancel(); connectTimeoutFuture = null; } @@ -612,7 +612,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann promise.asFuture().addListener(future -> { if (future.isCancelled()) { if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); + connectTimeoutFuture.cancel(); } connectPromise = null; close(newPromise()); @@ -684,7 +684,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); + connectTimeoutFuture.cancel(); } connectPromise = null; } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java index 5a919fa264..b4b385a28a 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java @@ -61,7 +61,7 @@ public class EpollEventLoopTest extends AbstractSingleThreadEventLoopTest { }, Long.MAX_VALUE, TimeUnit.MILLISECONDS); assertFalse(future.awaitUninterruptibly(1000)); - assertTrue(future.cancel(true)); + assertTrue(future.cancel()); assertNull(capture.get()); } finally { group.shutdownGracefully(); diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java index 7388c41bb6..7a1d9e4692 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java @@ -563,7 +563,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan promise.asFuture().addListener(future -> { if (future.isCancelled()) { if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); + connectTimeoutFuture.cancel(); } connectPromise = null; close(newPromise()); @@ -635,7 +635,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); + connectTimeoutFuture.cancel(); } connectPromise = null; } diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java index e578b7cca9..f24d2c95b3 100644 --- a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java @@ -41,7 +41,7 @@ public class KQueueEventLoopTest extends AbstractSingleThreadEventLoopTest { }, Long.MAX_VALUE, TimeUnit.MILLISECONDS); assertFalse(future.awaitUninterruptibly(1000)); - assertTrue(future.cancel(true)); + assertTrue(future.cancel()); group.shutdownGracefully(); } diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index 90d14d5793..6ac7844903 100644 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -87,6 +87,7 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparabl /** * Return the {@link EventLoop} this {@link Channel} was registered to. */ + @Override EventLoop executor(); /** diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index 90d516ecce..77b27d68c2 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -45,7 +45,7 @@ public class SingleThreadEventLoop extends SingleThreadEventExecutor implements @Override public boolean canBlock() { assert inEventLoop(); - return !SingleThreadEventLoop.this.hasTasks() && !SingleThreadEventLoop.this.hasScheduledTasks(); + return !hasTasks() && !hasScheduledTasks(); } @Override diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java index e44155972a..f7f9e92b6a 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java @@ -63,9 +63,9 @@ final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements } @Override - public void execute(Runnable command) { - requireNonNull(command, "command"); - tasks.add(command); + public void execute(Runnable task) { + requireNonNull(task, "command"); + tasks.add(task); if (!running) { runTasks(); } @@ -124,18 +124,12 @@ final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements } @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public Future terminationFuture() { - throw new UnsupportedOperationException(); - } - - @Override - @Deprecated - public void shutdown() { + public Future terminationFuture() { throw new UnsupportedOperationException(); } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java index 422816cbd0..59ee95f7ad 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java @@ -250,7 +250,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { promise.asFuture().addListener(future -> { if (future.isCancelled()) { if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); + connectTimeoutFuture.cancel(); } connectPromise = null; close(newPromise()); @@ -317,7 +317,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); + connectTimeoutFuture.cancel(); } connectPromise = null; } @@ -452,7 +452,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { Future future = connectTimeoutFuture; if (future != null) { - future.cancel(false); + future.cancel(); connectTimeoutFuture = null; } } diff --git a/transport/src/test/java/io/netty/channel/AbstractChannelTest.java b/transport/src/test/java/io/netty/channel/AbstractChannelTest.java index 3d243e3100..08ccbddffc 100644 --- a/transport/src/test/java/io/netty/channel/AbstractChannelTest.java +++ b/transport/src/test/java/io/netty/channel/AbstractChannelTest.java @@ -17,7 +17,6 @@ package io.netty.channel; import io.netty.util.NetUtil; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; import org.junit.jupiter.api.Test; @@ -26,6 +25,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; +import static io.netty.util.concurrent.ImmediateEventExecutor.INSTANCE; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.any; @@ -44,6 +44,7 @@ public class AbstractChannelTest { // This allows us to have a single-threaded test when(eventLoop.inEventLoop()).thenReturn(true); when(eventLoop.unsafe()).thenReturn(mock(EventLoop.Unsafe.class)); + when(eventLoop.newPromise()).thenReturn(INSTANCE.newPromise()); TestChannel channel = new TestChannel(eventLoop); // Using spy as otherwise intelliJ will not be able to understand that we dont want to skip the handler @@ -78,7 +79,7 @@ public class AbstractChannelTest { // This allows us to have a single-threaded test when(eventLoop.inEventLoop()).thenReturn(true); when(eventLoop.unsafe()).thenReturn(mock(EventLoop.Unsafe.class)); - when(eventLoop.newPromise()).thenReturn(ImmediateEventExecutor.INSTANCE.newPromise()); + when(eventLoop.newPromise()).thenAnswer(inv -> INSTANCE.newPromise()); doAnswer(invocationOnMock -> { ((Runnable) invocationOnMock.getArgument(0)).run(); @@ -134,7 +135,7 @@ public class AbstractChannelTest { // This allows us to have a single-threaded test when(eventLoop.inEventLoop()).thenReturn(true); when(eventLoop.unsafe()).thenReturn(mock(EventLoop.Unsafe.class)); - when(eventLoop.newPromise()).thenReturn(ImmediateEventExecutor.INSTANCE.newPromise()); + when(eventLoop.newPromise()).thenAnswer(inv -> INSTANCE.newPromise()); doAnswer(invocationOnMock -> { ((Runnable) invocationOnMock.getArgument(0)).run(); return null; diff --git a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java index 36ab4faaab..ff84abdf81 100644 --- a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java @@ -38,7 +38,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -95,14 +94,12 @@ public class SingleThreadEventLoopTest { } @Test - @SuppressWarnings("deprecation") public void shutdownBeforeStart() throws Exception { - loopA.shutdown(); + loopA.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS).await(); assertRejection(loopA); } @Test - @SuppressWarnings("deprecation") public void shutdownAfterStart() throws Exception { final CountDownLatch latch = new CountDownLatch(1); loopA.execute(latch::countDown); @@ -111,7 +108,7 @@ public class SingleThreadEventLoopTest { latch.await(); // Request the event loop thread to stop. - loopA.shutdown(); + loopA.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS).await(); assertRejection(loopA); assertTrue(loopA.isShutdown()); @@ -165,7 +162,7 @@ public class SingleThreadEventLoopTest { final Queue timestamps = new LinkedBlockingQueue<>(); final int expectedTimeStamps = 5; final CountDownLatch allTimeStampsLatch = new CountDownLatch(expectedTimeStamps); - ScheduledFuture f = loopA.scheduleAtFixedRate(() -> { + Future f = loopA.scheduleAtFixedRate(() -> { timestamps.add(System.nanoTime()); try { Thread.sleep(50); @@ -175,13 +172,13 @@ public class SingleThreadEventLoopTest { allTimeStampsLatch.countDown(); }, 100, 100, TimeUnit.MILLISECONDS); allTimeStampsLatch.await(); - assertTrue(f.cancel(true)); + assertTrue(f.cancel()); Thread.sleep(300); assertEquals(expectedTimeStamps, timestamps.size()); // Check if the task was run without a lag. Long firstTimestamp = null; - int cnt = 0; + long cnt = 0; for (Long t: timestamps) { if (firstTimestamp == null) { firstTimestamp = t; @@ -212,7 +209,7 @@ public class SingleThreadEventLoopTest { final Queue timestamps = new LinkedBlockingQueue<>(); final int expectedTimeStamps = 5; final CountDownLatch allTimeStampsLatch = new CountDownLatch(expectedTimeStamps); - ScheduledFuture f = loopA.scheduleAtFixedRate(() -> { + Future f = loopA.scheduleAtFixedRate(() -> { boolean empty = timestamps.isEmpty(); timestamps.add(System.nanoTime()); if (empty) { @@ -225,7 +222,7 @@ public class SingleThreadEventLoopTest { allTimeStampsLatch.countDown(); }, 100, 100, TimeUnit.MILLISECONDS); allTimeStampsLatch.await(); - assertTrue(f.cancel(true)); + assertTrue(f.cancel()); Thread.sleep(300); assertEquals(expectedTimeStamps, timestamps.size()); @@ -265,7 +262,7 @@ public class SingleThreadEventLoopTest { final Queue timestamps = new LinkedBlockingQueue<>(); final int expectedTimeStamps = 3; final CountDownLatch allTimeStampsLatch = new CountDownLatch(expectedTimeStamps); - ScheduledFuture f = loopA.scheduleWithFixedDelay(() -> { + Future f = loopA.scheduleWithFixedDelay(() -> { timestamps.add(System.nanoTime()); try { Thread.sleep(51); @@ -275,7 +272,7 @@ public class SingleThreadEventLoopTest { allTimeStampsLatch.countDown(); }, 100, 100, TimeUnit.MILLISECONDS); allTimeStampsLatch.await(); - assertTrue(f.cancel(true)); + assertTrue(f.cancel()); Thread.sleep(300); assertEquals(expectedTimeStamps, timestamps.size()); @@ -294,7 +291,6 @@ public class SingleThreadEventLoopTest { } @Test - @SuppressWarnings("deprecation") public void shutdownWithPendingTasks() throws Exception { final int NUM_TASKS = 3; final AtomicInteger ranTasks = new AtomicInteger(); @@ -321,7 +317,7 @@ public class SingleThreadEventLoopTest { assertEquals(1, ranTasks.get()); // Shut down the event loop to test if the other tasks are run before termination. - loopA.shutdown(); + loopA.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); // Let the other tasks run. latch.countDown(); @@ -337,9 +333,8 @@ public class SingleThreadEventLoopTest { @Test @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS) - @SuppressWarnings("deprecation") public void testRegistrationAfterShutdown() throws Exception { - loopA.shutdown(); + loopA.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS).await(); // Disable logging temporarily. Logger root = (Logger) LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME); @@ -367,9 +362,8 @@ public class SingleThreadEventLoopTest { @Test @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS) - @SuppressWarnings("deprecation") public void testRegistrationAfterShutdown2() throws Exception { - loopA.shutdown(); + loopA.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS).await(); final CountDownLatch latch = new CountDownLatch(1); Channel ch = new LocalChannel(loopA); @@ -462,17 +456,14 @@ public class SingleThreadEventLoopTest { @Override protected void run() { - for (;;) { + do { Runnable task = takeTask(); if (task != null) { task.run(); updateLastExecutionTime(); } - if (confirmShutdown()) { - break; - } - } + } while (!confirmShutdown()); } @Override @@ -504,7 +495,7 @@ public class SingleThreadEventLoopTest { @Override protected void run() { - for (;;) { + do { try { Thread.sleep(TimeUnit.NANOSECONDS.toMillis(delayNanos(System.nanoTime()))); } catch (InterruptedException e) { @@ -513,10 +504,7 @@ public class SingleThreadEventLoopTest { runAllTasks(Integer.MAX_VALUE); - if (confirmShutdown()) { - break; - } - } + } while (!confirmShutdown()); } @Override diff --git a/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java b/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java index 8e3c64e927..50b15af00d 100644 --- a/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java @@ -98,7 +98,7 @@ public class NioEventLoopTest extends AbstractEventLoopTest { }, Long.MAX_VALUE, TimeUnit.MILLISECONDS); assertFalse(future.awaitUninterruptibly(1000)); - assertTrue(future.cancel(true)); + assertTrue(future.cancel()); group.shutdownGracefully(); } @@ -169,7 +169,6 @@ public class NioEventLoopTest extends AbstractEventLoopTest { } } - @SuppressWarnings("deprecation") @Test public void testTaskRemovalOnShutdownThrowsNoUnsupportedOperationException() throws Exception { final AtomicReference error = new AtomicReference<>(); @@ -191,9 +190,9 @@ public class NioEventLoopTest extends AbstractEventLoopTest { } }); t.start(); - group.shutdownNow(); + Future termination = group.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); t.join(); - group.terminationFuture().syncUninterruptibly(); + termination.syncUninterruptibly(); assertThat(error.get(), instanceOf(RejectedExecutionException.class)); error.set(null); } diff --git a/transport/src/test/java/io/netty/channel/socket/nio/AbstractNioChannelTest.java b/transport/src/test/java/io/netty/channel/socket/nio/AbstractNioChannelTest.java index 29345c23ef..2f529462bf 100644 --- a/transport/src/test/java/io/netty/channel/socket/nio/AbstractNioChannelTest.java +++ b/transport/src/test/java/io/netty/channel/socket/nio/AbstractNioChannelTest.java @@ -23,7 +23,6 @@ import io.netty.channel.nio.AbstractNioChannel; import io.netty.channel.nio.NioHandler; import io.netty.util.concurrent.AbstractEventExecutor; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.ScheduledFuture; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -108,6 +107,7 @@ public abstract class AbstractNioChannelTest { this.eventLoop = eventLoop; } + @Override @Test public EventLoop next() { return this; @@ -118,11 +118,6 @@ public abstract class AbstractNioChannelTest { return eventLoop.unsafe(); } - @Override - public void shutdown() { - eventLoop.shutdown(); - } - @Override public boolean inEventLoop(Thread thread) { return eventLoop.inEventLoop(thread); @@ -134,12 +129,12 @@ public abstract class AbstractNioChannelTest { } @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { return eventLoop.shutdownGracefully(quietPeriod, timeout, unit); } @Override - public Future terminationFuture() { + public Future terminationFuture() { return eventLoop.terminationFuture(); } @@ -159,30 +154,30 @@ public abstract class AbstractNioChannelTest { } @Override - public void execute(Runnable command) { - eventLoop.execute(command); + public void execute(Runnable task) { + eventLoop.execute(task); } @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - return eventLoop.schedule(command, delay, unit); + public Future schedule(Runnable task, long delay, TimeUnit unit) { + return eventLoop.schedule(task, delay, unit); } @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - return eventLoop.schedule(callable, delay, unit); + public Future schedule(Callable task, long delay, TimeUnit unit) { + return eventLoop.schedule(task, delay, unit); } @Override - public ScheduledFuture scheduleAtFixedRate( - Runnable command, long initialDelay, long period, TimeUnit unit) { - return eventLoop.scheduleAtFixedRate(command, initialDelay, period, unit); + public Future scheduleAtFixedRate( + Runnable task, long initialDelay, long period, TimeUnit unit) { + return eventLoop.scheduleAtFixedRate(task, initialDelay, period, unit); } @Override - public ScheduledFuture scheduleWithFixedDelay( - Runnable command, long initialDelay, long delay, TimeUnit unit) { - return eventLoop.scheduleWithFixedDelay(command, initialDelay, delay, unit); + public Future scheduleWithFixedDelay( + Runnable task, long initialDelay, long delay, TimeUnit unit) { + return eventLoop.scheduleWithFixedDelay(task, initialDelay, delay, unit); } }