Avoid extra Runnable allocs when scheduling tasks outside event loop (#9744)

Motivation

Currently when future tasks are scheduled via EventExecutors from a
different thread, at least two allocations are performed - the
ScheduledFutureTask wrapping the to-be-run task, and a Runnable wrapping
the action to add to the scheduled task priority queue. The latter can
be avoided by incorporating this logic into the former.

Modification

- When scheduling or cancelling a future task from outside the event
loop, enqueue the task itself rather than wrapping in a Runnable
- Have ScheduledFutureTask#run first verify the task's deadline has
passed and if not add or remove it from the scheduledTaskQueue depending
on its cancellation state
- Add new outside-event-loop benchmarks to ScheduleFutureTaskBenchmark

Result

Fewer allocations when scheduling/cancelling future tasks
This commit is contained in:
Nick Hill 2019-11-04 02:57:53 -08:00 committed by Norman Maurer
parent 17bffce90e
commit feb804dca8
4 changed files with 77 additions and 34 deletions

View File

@ -126,21 +126,21 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
protected final Runnable pollScheduledTask(long nanoTime) { protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop(); assert inEventLoop();
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) { if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
return null; return null;
} }
scheduledTaskQueue.remove(); scheduledTaskQueue.remove();
scheduledTask.setConsumed();
return scheduledTask; return scheduledTask;
} }
/** /**
* Return the nanoseconds when the next scheduled task is ready to be run or {@code -1} if no task is scheduled. * Return the nanoseconds until the next scheduled task is ready to be run or {@code -1} if no task is scheduled.
*/ */
protected final long nextScheduledTaskNano() { protected final long nextScheduledTaskNano() {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask(); ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
return scheduledTask != null ? Math.max(0, scheduledTask.deadlineNanos() - nanoTime()) : -1; return scheduledTask != null ? scheduledTask.delayNanos() : -1;
} }
/** /**
@ -244,21 +244,21 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
// NOOP // NOOP
} }
final void scheduleFromEventLoop(final ScheduledFutureTask<?> task) {
// nextTaskId a long and so there is no chance it will overflow back to 0
scheduledTaskQueue().add(task.setId(++nextTaskId));
}
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) { if (inEventLoop()) {
scheduledTaskQueue().add(task.setId(nextTaskId++)); scheduleFromEventLoop(task);
} else { } else {
final long deadlineNanos = task.deadlineNanos(); final long deadlineNanos = task.deadlineNanos();
final Runnable addToQueue = new Runnable() { // task will add itself to scheduled task queue when run if not expired
@Override
public void run() {
scheduledTaskQueue().add(task.setId(nextTaskId++));
}
};
if (beforeScheduledTaskSubmitted(deadlineNanos)) { if (beforeScheduledTaskSubmitted(deadlineNanos)) {
execute(addToQueue); execute(task);
} else { } else {
lazyExecute(addToQueue); lazyExecute(task);
// Second hook after scheduling to facilitate race-avoidance // Second hook after scheduling to facilitate race-avoidance
if (afterScheduledTaskSubmitted(deadlineNanos)) { if (afterScheduledTaskSubmitted(deadlineNanos)) {
execute(WAKEUP_TASK); execute(WAKEUP_TASK);
@ -270,15 +270,12 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
} }
final void removeScheduled(final ScheduledFutureTask<?> task) { final void removeScheduled(final ScheduledFutureTask<?> task) {
assert task.isCancelled();
if (inEventLoop()) { if (inEventLoop()) {
scheduledTaskQueue().removeTyped(task); scheduledTaskQueue().removeTyped(task);
} else { } else {
lazyExecute(new Runnable() { // task will remove itself from scheduled task queue when it runs
@Override lazyExecute(task);
public void run() {
scheduledTaskQueue().removeTyped(task);
}
});
} }
} }

View File

@ -176,6 +176,11 @@ class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> {
return clearTaskAfterCompletion(super.cancel(mayInterruptIfRunning), CANCELLED); return clearTaskAfterCompletion(super.cancel(mayInterruptIfRunning), CANCELLED);
} }
@Override
public final boolean isCancelled() {
return task == CANCELLED || super.isCancelled();
}
@Override @Override
protected StringBuilder toStringBuilder() { protected StringBuilder toStringBuilder() {
StringBuilder buf = super.toStringBuilder(); StringBuilder buf = super.toStringBuilder();

View File

@ -19,7 +19,6 @@ package io.netty.util.concurrent;
import io.netty.util.internal.DefaultPriorityQueue; import io.netty.util.internal.DefaultPriorityQueue;
import io.netty.util.internal.PriorityQueueNode; import io.netty.util.internal.PriorityQueueNode;
import java.util.Queue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.Delayed; import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -91,7 +90,9 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
} }
ScheduledFutureTask<V> setId(long id) { ScheduledFutureTask<V> setId(long id) {
if (this.id == 0L) {
this.id = id; this.id = id;
}
return this; return this;
} }
@ -104,16 +105,26 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
return deadlineNanos; return deadlineNanos;
} }
void setConsumed() {
// Optimization to avoid checking system clock again
// after deadline has passed and task has been dequeued
if (periodNanos == 0) {
assert nanoTime() > deadlineNanos;
deadlineNanos = 0L;
}
}
public long delayNanos() { public long delayNanos() {
return deadlineToDelayNanos(deadlineNanos()); return deadlineToDelayNanos(deadlineNanos());
} }
static long deadlineToDelayNanos(long deadlineNanos) { static long deadlineToDelayNanos(long deadlineNanos) {
return Math.max(0, deadlineNanos - nanoTime()); return deadlineNanos == 0L ? 0L : Math.max(0L, deadlineNanos - nanoTime());
} }
public long delayNanos(long currentTimeNanos) { public long delayNanos(long currentTimeNanos) {
return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME)); return deadlineNanos == 0L ? 0L
: Math.max(0L, deadlineNanos() - (currentTimeNanos - START_TIME));
} }
@Override @Override
@ -145,6 +156,15 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
public void run() { public void run() {
assert executor().inEventLoop(); assert executor().inEventLoop();
try { try {
if (delayNanos() > 0L) {
// Not yet expired, need to add or remove from queue
if (isCancelled()) {
scheduledExecutor().scheduledTaskQueue().removeTyped(this);
} else {
scheduledExecutor().scheduleFromEventLoop(this);
}
return;
}
if (periodNanos == 0) { if (periodNanos == 0) {
if (setUncancellableInternal()) { if (setUncancellableInternal()) {
V result = runTask(); V result = runTask();
@ -161,11 +181,7 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
deadlineNanos = nanoTime() - periodNanos; deadlineNanos = nanoTime() - periodNanos;
} }
if (!isCancelled()) { if (!isCancelled()) {
// scheduledTaskQueue can never be null as we lazy init it before submit the task! scheduledExecutor().scheduledTaskQueue().add(this);
Queue<ScheduledFutureTask<?>> scheduledTaskQueue =
((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;
assert scheduledTaskQueue != null;
scheduledTaskQueue.add(this);
} }
} }
} }
@ -175,6 +191,10 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
} }
} }
private AbstractScheduledEventExecutor scheduledExecutor() {
return (AbstractScheduledEventExecutor) executor();
}
/** /**
* {@inheritDoc} * {@inheritDoc}
* *
@ -184,7 +204,7 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
public boolean cancel(boolean mayInterruptIfRunning) { public boolean cancel(boolean mayInterruptIfRunning) {
boolean canceled = super.cancel(mayInterruptIfRunning); boolean canceled = super.cancel(mayInterruptIfRunning);
if (canceled) { if (canceled) {
((AbstractScheduledEventExecutor) executor()).removeScheduled(this); scheduledExecutor().removeScheduled(this);
} }
return canceled; return canceled;
} }

View File

@ -29,11 +29,11 @@ import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.annotations.Warmup;
import io.netty.channel.DefaultEventLoop; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.microbench.util.AbstractMicrobenchmark; import io.netty.microbench.util.AbstractMicrobenchmark;
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) @Warmup(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS) @Measurement(iterations = 10, time = 3, timeUnit = TimeUnit.SECONDS)
@State(Scope.Benchmark) @State(Scope.Benchmark)
public class ScheduleFutureTaskBenchmark extends AbstractMicrobenchmark { public class ScheduleFutureTaskBenchmark extends AbstractMicrobenchmark {
@ -54,7 +54,7 @@ public class ScheduleFutureTaskBenchmark extends AbstractMicrobenchmark {
@Setup(Level.Trial) @Setup(Level.Trial)
public void reset() { public void reset() {
eventLoop = new DefaultEventLoop(); eventLoop = (AbstractScheduledEventExecutor) new NioEventLoopGroup(1).next();
} }
@Setup(Level.Invocation) @Setup(Level.Invocation)
@ -69,7 +69,8 @@ public class ScheduleFutureTaskBenchmark extends AbstractMicrobenchmark {
@TearDown(Level.Trial) @TearDown(Level.Trial)
public void shutdown() { public void shutdown() {
eventLoop.shutdownGracefully().awaitUninterruptibly(); clear();
eventLoop.parent().shutdownGracefully().awaitUninterruptibly();
} }
} }
@ -85,4 +86,24 @@ public class ScheduleFutureTaskBenchmark extends AbstractMicrobenchmark {
} }
}).syncUninterruptibly(); }).syncUninterruptibly();
} }
@Benchmark
@Threads(1)
public Future<?> scheduleLotsOutsideLoop(final ThreadState threadState) {
final AbstractScheduledEventExecutor eventLoop = threadState.eventLoop;
for (int i = 1; i <= threadState.num; i++) {
eventLoop.schedule(NO_OP, i, TimeUnit.HOURS);
}
return null;
}
@Benchmark
@Threads(1)
public Future<?> scheduleCancelLotsOutsideLoop(final ThreadState threadState) {
final AbstractScheduledEventExecutor eventLoop = threadState.eventLoop;
for (int i = 1; i <= threadState.num; i++) {
eventLoop.schedule(NO_OP, i, TimeUnit.HOURS).cancel(false);
}
return null;
}
} }