From feb804dca8579c93ff3224564aafcf5938bb2c52 Mon Sep 17 00:00:00 2001 From: Nick Hill Date: Mon, 4 Nov 2019 02:57:53 -0800 Subject: [PATCH] Avoid extra Runnable allocs when scheduling tasks outside event loop (#9744) Motivation Currently when future tasks are scheduled via EventExecutors from a different thread, at least two allocations are performed - the ScheduledFutureTask wrapping the to-be-run task, and a Runnable wrapping the action to add to the scheduled task priority queue. The latter can be avoided by incorporating this logic into the former. Modification - When scheduling or cancelling a future task from outside the event loop, enqueue the task itself rather than wrapping in a Runnable - Have ScheduledFutureTask#run first verify the task's deadline has passed and if not add or remove it from the scheduledTaskQueue depending on its cancellation state - Add new outside-event-loop benchmarks to ScheduleFutureTaskBenchmark Result Fewer allocations when scheduling/cancelling future tasks --- .../AbstractScheduledEventExecutor.java | 35 ++++++++-------- .../io/netty/util/concurrent/PromiseTask.java | 5 +++ .../util/concurrent/ScheduledFutureTask.java | 40 ++++++++++++++----- .../ScheduleFutureTaskBenchmark.java | 31 +++++++++++--- 4 files changed, 77 insertions(+), 34 deletions(-) diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java index c132aa5b20..d51357b77b 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java @@ -126,21 +126,21 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut protected final Runnable pollScheduledTask(long nanoTime) { assert inEventLoop(); - Queue> scheduledTaskQueue = this.scheduledTaskQueue; - ScheduledFutureTask scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); + ScheduledFutureTask scheduledTask = peekScheduledTask(); if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) { return null; } scheduledTaskQueue.remove(); + scheduledTask.setConsumed(); return scheduledTask; } /** - * Return the nanoseconds when the next scheduled task is ready to be run or {@code -1} if no task is scheduled. + * Return the nanoseconds until the next scheduled task is ready to be run or {@code -1} if no task is scheduled. */ protected final long nextScheduledTaskNano() { ScheduledFutureTask scheduledTask = peekScheduledTask(); - return scheduledTask != null ? Math.max(0, scheduledTask.deadlineNanos() - nanoTime()) : -1; + return scheduledTask != null ? scheduledTask.delayNanos() : -1; } /** @@ -244,21 +244,21 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut // NOOP } + final void scheduleFromEventLoop(final ScheduledFutureTask task) { + // nextTaskId a long and so there is no chance it will overflow back to 0 + scheduledTaskQueue().add(task.setId(++nextTaskId)); + } + private ScheduledFuture schedule(final ScheduledFutureTask task) { if (inEventLoop()) { - scheduledTaskQueue().add(task.setId(nextTaskId++)); + scheduleFromEventLoop(task); } else { final long deadlineNanos = task.deadlineNanos(); - final Runnable addToQueue = new Runnable() { - @Override - public void run() { - scheduledTaskQueue().add(task.setId(nextTaskId++)); - } - }; + // task will add itself to scheduled task queue when run if not expired if (beforeScheduledTaskSubmitted(deadlineNanos)) { - execute(addToQueue); + execute(task); } else { - lazyExecute(addToQueue); + lazyExecute(task); // Second hook after scheduling to facilitate race-avoidance if (afterScheduledTaskSubmitted(deadlineNanos)) { execute(WAKEUP_TASK); @@ -270,15 +270,12 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut } final void removeScheduled(final ScheduledFutureTask task) { + assert task.isCancelled(); if (inEventLoop()) { scheduledTaskQueue().removeTyped(task); } else { - lazyExecute(new Runnable() { - @Override - public void run() { - scheduledTaskQueue().removeTyped(task); - } - }); + // task will remove itself from scheduled task queue when it runs + lazyExecute(task); } } diff --git a/common/src/main/java/io/netty/util/concurrent/PromiseTask.java b/common/src/main/java/io/netty/util/concurrent/PromiseTask.java index 0025815c52..fbd8f2694d 100644 --- a/common/src/main/java/io/netty/util/concurrent/PromiseTask.java +++ b/common/src/main/java/io/netty/util/concurrent/PromiseTask.java @@ -176,6 +176,11 @@ class PromiseTask extends DefaultPromise implements RunnableFuture { return clearTaskAfterCompletion(super.cancel(mayInterruptIfRunning), CANCELLED); } + @Override + public final boolean isCancelled() { + return task == CANCELLED || super.isCancelled(); + } + @Override protected StringBuilder toStringBuilder() { StringBuilder buf = super.toStringBuilder(); diff --git a/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java index 5ffe824119..023c498f34 100644 --- a/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java +++ b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java @@ -19,7 +19,6 @@ package io.netty.util.concurrent; import io.netty.util.internal.DefaultPriorityQueue; import io.netty.util.internal.PriorityQueueNode; -import java.util.Queue; import java.util.concurrent.Callable; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; @@ -91,7 +90,9 @@ final class ScheduledFutureTask extends PromiseTask implements ScheduledFu } ScheduledFutureTask setId(long id) { - this.id = id; + if (this.id == 0L) { + this.id = id; + } return this; } @@ -104,16 +105,26 @@ final class ScheduledFutureTask extends PromiseTask implements ScheduledFu return deadlineNanos; } + void setConsumed() { + // Optimization to avoid checking system clock again + // after deadline has passed and task has been dequeued + if (periodNanos == 0) { + assert nanoTime() > deadlineNanos; + deadlineNanos = 0L; + } + } + public long delayNanos() { return deadlineToDelayNanos(deadlineNanos()); } static long deadlineToDelayNanos(long deadlineNanos) { - return Math.max(0, deadlineNanos - nanoTime()); + return deadlineNanos == 0L ? 0L : Math.max(0L, deadlineNanos - nanoTime()); } public long delayNanos(long currentTimeNanos) { - return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME)); + return deadlineNanos == 0L ? 0L + : Math.max(0L, deadlineNanos() - (currentTimeNanos - START_TIME)); } @Override @@ -145,6 +156,15 @@ final class ScheduledFutureTask extends PromiseTask implements ScheduledFu public void run() { assert executor().inEventLoop(); try { + if (delayNanos() > 0L) { + // Not yet expired, need to add or remove from queue + if (isCancelled()) { + scheduledExecutor().scheduledTaskQueue().removeTyped(this); + } else { + scheduledExecutor().scheduleFromEventLoop(this); + } + return; + } if (periodNanos == 0) { if (setUncancellableInternal()) { V result = runTask(); @@ -161,11 +181,7 @@ final class ScheduledFutureTask extends PromiseTask implements ScheduledFu deadlineNanos = nanoTime() - periodNanos; } if (!isCancelled()) { - // scheduledTaskQueue can never be null as we lazy init it before submit the task! - Queue> scheduledTaskQueue = - ((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue; - assert scheduledTaskQueue != null; - scheduledTaskQueue.add(this); + scheduledExecutor().scheduledTaskQueue().add(this); } } } @@ -175,6 +191,10 @@ final class ScheduledFutureTask extends PromiseTask implements ScheduledFu } } + private AbstractScheduledEventExecutor scheduledExecutor() { + return (AbstractScheduledEventExecutor) executor(); + } + /** * {@inheritDoc} * @@ -184,7 +204,7 @@ final class ScheduledFutureTask extends PromiseTask implements ScheduledFu public boolean cancel(boolean mayInterruptIfRunning) { boolean canceled = super.cancel(mayInterruptIfRunning); if (canceled) { - ((AbstractScheduledEventExecutor) executor()).removeScheduled(this); + scheduledExecutor().removeScheduled(this); } return canceled; } diff --git a/microbench/src/main/java/io/netty/util/concurrent/ScheduleFutureTaskBenchmark.java b/microbench/src/main/java/io/netty/util/concurrent/ScheduleFutureTaskBenchmark.java index 72bc71ea22..d5fdd5bd17 100644 --- a/microbench/src/main/java/io/netty/util/concurrent/ScheduleFutureTaskBenchmark.java +++ b/microbench/src/main/java/io/netty/util/concurrent/ScheduleFutureTaskBenchmark.java @@ -29,11 +29,11 @@ import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; -import io.netty.channel.DefaultEventLoop; +import io.netty.channel.nio.NioEventLoopGroup; import io.netty.microbench.util.AbstractMicrobenchmark; -@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) -@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS) +@Warmup(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 10, time = 3, timeUnit = TimeUnit.SECONDS) @State(Scope.Benchmark) public class ScheduleFutureTaskBenchmark extends AbstractMicrobenchmark { @@ -54,7 +54,7 @@ public class ScheduleFutureTaskBenchmark extends AbstractMicrobenchmark { @Setup(Level.Trial) public void reset() { - eventLoop = new DefaultEventLoop(); + eventLoop = (AbstractScheduledEventExecutor) new NioEventLoopGroup(1).next(); } @Setup(Level.Invocation) @@ -69,7 +69,8 @@ public class ScheduleFutureTaskBenchmark extends AbstractMicrobenchmark { @TearDown(Level.Trial) public void shutdown() { - eventLoop.shutdownGracefully().awaitUninterruptibly(); + clear(); + eventLoop.parent().shutdownGracefully().awaitUninterruptibly(); } } @@ -85,4 +86,24 @@ public class ScheduleFutureTaskBenchmark extends AbstractMicrobenchmark { } }).syncUninterruptibly(); } + + @Benchmark + @Threads(1) + public Future scheduleLotsOutsideLoop(final ThreadState threadState) { + final AbstractScheduledEventExecutor eventLoop = threadState.eventLoop; + for (int i = 1; i <= threadState.num; i++) { + eventLoop.schedule(NO_OP, i, TimeUnit.HOURS); + } + return null; + } + + @Benchmark + @Threads(1) + public Future scheduleCancelLotsOutsideLoop(final ThreadState threadState) { + final AbstractScheduledEventExecutor eventLoop = threadState.eventLoop; + for (int i = 1; i <= threadState.num; i++) { + eventLoop.schedule(NO_OP, i, TimeUnit.HOURS).cancel(false); + } + return null; + } }