Avoid wrapping scheduled Runnables in Callable adapter (#9666)

Motivation

Currently when future tasks are scheduled via schedule(Runnable, ...)
methods, the supplied Runnable is wrapped in a newly allocated Callable
adapter prior to being wrapped in a ScheduledFutureTask.

This can be avoided which saves an object allocation per scheduled task.

Modifications

Change the Callable task field of ScheduledFutureTask to be of type
Object so that it can hold/run Runnables directly in addition to
Callables.

An "adapter" is still used in the case a Runnable is scheduled with an
explicit constant non-null completion value, assumed to be rare.

Result

Less garbage
This commit is contained in:
Nick Hill 2019-10-17 07:01:53 -07:00 committed by Norman Maurer
parent bbc34d0eda
commit 19b4adf79c
4 changed files with 61 additions and 42 deletions

View File

@ -19,10 +19,11 @@ import io.netty.util.internal.DefaultPriorityQueue;
import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.PriorityQueue; import io.netty.util.internal.PriorityQueue;
import static io.netty.util.concurrent.ScheduledFutureTask.deadlineNanos;
import java.util.Comparator; import java.util.Comparator;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -169,7 +170,7 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
validateScheduled0(delay, unit); validateScheduled0(delay, unit);
return schedule(new ScheduledFutureTask<Void>( return schedule(new ScheduledFutureTask<Void>(
this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay)))); this, command, deadlineNanos(unit.toNanos(delay))));
} }
@Override @Override
@ -181,8 +182,7 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
} }
validateScheduled0(delay, unit); validateScheduled0(delay, unit);
return schedule(new ScheduledFutureTask<V>( return schedule(new ScheduledFutureTask<V>(this, callable, deadlineNanos(unit.toNanos(delay))));
this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
} }
@Override @Override
@ -201,8 +201,7 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
validateScheduled0(period, unit); validateScheduled0(period, unit);
return schedule(new ScheduledFutureTask<Void>( return schedule(new ScheduledFutureTask<Void>(
this, Executors.<Void>callable(command, null), this, command, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
} }
@Override @Override
@ -222,8 +221,7 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
validateScheduled0(delay, unit); validateScheduled0(delay, unit);
return schedule(new ScheduledFutureTask<Void>( return schedule(new ScheduledFutureTask<Void>(
this, Executors.<Void>callable(command, null), this, command, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")

View File

@ -20,10 +20,6 @@ import java.util.concurrent.RunnableFuture;
class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> { class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> {
static <T> Callable<T> toCallable(Runnable runnable, T result) {
return new RunnableAdapter<T>(runnable, result);
}
private static final class RunnableAdapter<T> implements Callable<T> { private static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task; final Runnable task;
final T result; final T result;
@ -45,21 +41,19 @@ class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> {
} }
} }
private static final Callable<?> COMPLETED = new SentinelCallable<Object>("COMPLETED"); private static final Runnable COMPLETED = new SentinelRunnable("COMPLETED");
private static final Callable<?> CANCELLED = new SentinelCallable<Object>("CANCELLED"); private static final Runnable CANCELLED = new SentinelRunnable("CANCELLED");
private static final Callable<?> FAILED = new SentinelCallable<Object>("FAILED"); private static final Runnable FAILED = new SentinelRunnable("FAILED");
private static class SentinelCallable<T> implements Callable<T> { private static class SentinelRunnable implements Runnable {
private final String name; private final String name;
SentinelCallable(String name) { SentinelRunnable(String name) {
this.name = name; this.name = name;
} }
@Override @Override
public T call() { public void run() { } // no-op
return null;
}
@Override @Override
public String toString() { public String toString() {
@ -67,10 +61,17 @@ class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> {
} }
} }
protected Callable<V> task; // Strictly of type Callable<V> or Runnable
private Object task;
PromiseTask(EventExecutor executor, Runnable runnable, V result) { PromiseTask(EventExecutor executor, Runnable runnable, V result) {
this(executor, toCallable(runnable, result)); super(executor);
task = result == null ? runnable : new RunnableAdapter<V>(runnable, result);
}
PromiseTask(EventExecutor executor, Runnable runnable) {
super(executor);
task = runnable;
} }
PromiseTask(EventExecutor executor, Callable<V> callable) { PromiseTask(EventExecutor executor, Callable<V> callable) {
@ -88,11 +89,21 @@ class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> {
return this == obj; return this == obj;
} }
@SuppressWarnings("unchecked")
final V runTask() throws Exception {
final Object task = this.task;
if (task instanceof Callable) {
return ((Callable<V>) task).call();
}
((Runnable) task).run();
return null;
}
@Override @Override
public void run() { public void run() {
try { try {
if (setUncancellableInternal()) { if (setUncancellableInternal()) {
V result = task.call(); V result = runTask();
setSuccessInternal(result); setSuccessInternal(result);
} }
} catch (Throwable e) { } catch (Throwable e) {
@ -100,14 +111,13 @@ class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> {
} }
} }
@SuppressWarnings("unchecked") private boolean clearTaskAfterCompletion(boolean done, Runnable result) {
private boolean clearTaskAfterCompletion(boolean done, Callable<?> result) {
if (done) { if (done) {
// The only time where it might be possible for the sentinel task // The only time where it might be possible for the sentinel task
// to be called is in the case of a periodic ScheduledFutureTask, // to be called is in the case of a periodic ScheduledFutureTask,
// in which case it's a benign race with cancellation and the (null) // in which case it's a benign race with cancellation and the (null)
// return value is not used. // return value is not used.
task = (Callable<V>) result; task = result;
} }
return done; return done;
} }

View File

@ -51,27 +51,31 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
private int queueIndex = INDEX_NOT_IN_QUEUE; private int queueIndex = INDEX_NOT_IN_QUEUE;
ScheduledFutureTask( ScheduledFutureTask(AbstractScheduledEventExecutor executor,
AbstractScheduledEventExecutor executor, Runnable runnable, long nanoTime) {
Runnable runnable, V result, long nanoTime) {
this(executor, toCallable(runnable, result), nanoTime); super(executor, runnable);
deadlineNanos = nanoTime;
periodNanos = 0;
} }
ScheduledFutureTask( ScheduledFutureTask(AbstractScheduledEventExecutor executor,
AbstractScheduledEventExecutor executor, Runnable runnable, long nanoTime, long period) {
super(executor, runnable);
deadlineNanos = nanoTime;
periodNanos = validatePeriod(period);
}
ScheduledFutureTask(AbstractScheduledEventExecutor executor,
Callable<V> callable, long nanoTime, long period) { Callable<V> callable, long nanoTime, long period) {
super(executor, callable); super(executor, callable);
if (period == 0) {
throw new IllegalArgumentException("period: 0 (expected: != 0)");
}
deadlineNanos = nanoTime; deadlineNanos = nanoTime;
periodNanos = period; periodNanos = validatePeriod(period);
} }
ScheduledFutureTask( ScheduledFutureTask(AbstractScheduledEventExecutor executor,
AbstractScheduledEventExecutor executor,
Callable<V> callable, long nanoTime) { Callable<V> callable, long nanoTime) {
super(executor, callable); super(executor, callable);
@ -79,6 +83,13 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
periodNanos = 0; periodNanos = 0;
} }
private static long validatePeriod(long period) {
if (period == 0) {
throw new IllegalArgumentException("period: 0 (expected: != 0)");
}
return period;
}
ScheduledFutureTask<V> setId(long id) { ScheduledFutureTask<V> setId(long id) {
this.id = id; this.id = id;
return this; return this;
@ -136,13 +147,13 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
try { try {
if (periodNanos == 0) { if (periodNanos == 0) {
if (setUncancellableInternal()) { if (setUncancellableInternal()) {
V result = task.call(); V result = runTask();
setSuccessInternal(result); setSuccessInternal(result);
} }
} else { } else {
// check if is done as it may was cancelled // check if is done as it may was cancelled
if (!isCancelled()) { if (!isCancelled()) {
task.call(); runTask();
if (!executor().isShutdown()) { if (!executor().isShutdown()) {
if (periodNanos > 0) { if (periodNanos > 0) {
deadlineNanos += periodNanos; deadlineNanos += periodNanos;

View File

@ -215,7 +215,7 @@ public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolE
RunnableScheduledFutureTask(EventExecutor executor, Runnable runnable, RunnableScheduledFutureTask(EventExecutor executor, Runnable runnable,
RunnableScheduledFuture<V> future) { RunnableScheduledFuture<V> future) {
super(executor, runnable, null); super(executor, runnable);
this.future = future; this.future = future;
} }
@ -232,7 +232,7 @@ public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolE
} else if (!isDone()) { } else if (!isDone()) {
try { try {
// Its a periodic task so we need to ignore the return value // Its a periodic task so we need to ignore the return value
task.call(); runTask();
} catch (Throwable cause) { } catch (Throwable cause) {
if (!tryFailureInternal(cause)) { if (!tryFailureInternal(cause)) {
logger.warn("Failure during execution of task", cause); logger.warn("Failure during execution of task", cause);