Ensure cancelled scheduled tasks can be GC'ed ASAP

Motivation:

Prior we used a purge task that would remove previous canceled scheduled tasks from the internal queue. This could introduce some delay and so use a lot of memory even if the task itself is already canceled.

Modifications:

Schedule removal of task from queue via EventLoop if cancel operation is not done in the EventLoop Thread or just remove directly if the Thread that cancels the scheduled task is in the EventLoop.

Result:

Faster possibility to GC a canceled ScheduledFutureTask.
This commit is contained in:
Norman Maurer 2015-07-18 20:50:34 +02:00
parent ae32fd4f0b
commit 36c80cd818
4 changed files with 37 additions and 39 deletions

View File

@ -16,8 +16,8 @@
package io.netty.util.concurrent; package io.netty.util.concurrent;
import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.OneTimeTask;
import java.util.Iterator;
import java.util.PriorityQueue; import java.util.PriorityQueue;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -62,7 +62,7 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[scheduledTaskQueue.size()]); scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[scheduledTaskQueue.size()]);
for (ScheduledFutureTask<?> task: scheduledTasks) { for (ScheduledFutureTask<?> task: scheduledTasks) {
task.cancel(false); task.cancelWithoutRemove(false);
} }
scheduledTaskQueue.clear(); scheduledTaskQueue.clear();
@ -188,7 +188,7 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
if (inEventLoop()) { if (inEventLoop()) {
scheduledTaskQueue().add(task); scheduledTaskQueue().add(task);
} else { } else {
execute(new Runnable() { execute(new OneTimeTask() {
@Override @Override
public void run() { public void run() {
scheduledTaskQueue().add(task); scheduledTaskQueue().add(task);
@ -199,17 +199,16 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
return task; return task;
} }
void purgeCancelledScheduledTasks() { final void removeScheduled(final ScheduledFutureTask<?> task) {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; if (inEventLoop()) {
if (isNullOrEmpty(scheduledTaskQueue)) { scheduledTaskQueue().remove(task);
return; } else {
} execute(new OneTimeTask() {
Iterator<ScheduledFutureTask<?>> i = scheduledTaskQueue.iterator(); @Override
while (i.hasNext()) { public void run() {
ScheduledFutureTask<?> task = i.next(); removeScheduled(task);
if (task.isCancelled()) {
i.remove();
} }
});
} }
} }
} }

View File

@ -36,14 +36,18 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class);
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1); private static final long SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(1);
public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor(); public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor();
final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>(); final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
final ScheduledFutureTask<Void> purgeTask = new ScheduledFutureTask<Void>( final ScheduledFutureTask<Void> quietPeriodTask = new ScheduledFutureTask<Void>(
this, Executors.<Void>callable(new PurgeTask(), null), this, Executors.<Void>callable(new Runnable() {
ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL); @Override
public void run() {
// NOOP
}
}, null), ScheduledFutureTask.deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL);
private final ThreadFactory threadFactory = new DefaultThreadFactory(getClass()); private final ThreadFactory threadFactory = new DefaultThreadFactory(getClass());
private final TaskRunner taskRunner = new TaskRunner(); private final TaskRunner taskRunner = new TaskRunner();
@ -53,7 +57,7 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor {
private final Future<?> terminationFuture = new FailedFuture<Object>(this, new UnsupportedOperationException()); private final Future<?> terminationFuture = new FailedFuture<Object>(this, new UnsupportedOperationException());
private GlobalEventExecutor() { private GlobalEventExecutor() {
scheduledTaskQueue().add(purgeTask); scheduledTaskQueue().add(quietPeriodTask);
} }
@Override @Override
@ -231,13 +235,13 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor {
logger.warn("Unexpected exception from the global event executor: ", t); logger.warn("Unexpected exception from the global event executor: ", t);
} }
if (task != purgeTask) { if (task != quietPeriodTask) {
continue; continue;
} }
} }
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue; Queue<ScheduledFutureTask<?>> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue;
// Terminate if there is no task in the queue (except the purge task). // Terminate if there is no task in the queue (except the noop task).
if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) { if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
// Mark the current thread as stopped. // Mark the current thread as stopped.
// The following CAS must always success and must be uncontended, // The following CAS must always success and must be uncontended,
@ -268,11 +272,4 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor {
} }
} }
} }
private final class PurgeTask implements Runnable {
@Override
public void run() {
purgeCancelledScheduledTasks();
}
}
} }

View File

@ -146,6 +146,19 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
} }
} }
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean canceled = super.cancel(mayInterruptIfRunning);
if (canceled) {
((AbstractScheduledEventExecutor) executor()).removeScheduled(this);
}
return canceled;
}
boolean cancelWithoutRemove(boolean mayInterruptIfRunning) {
return super.cancel(mayInterruptIfRunning);
}
@Override @Override
protected StringBuilder toStringBuilder() { protected StringBuilder toStringBuilder() {
StringBuilder buf = super.toStringBuilder(); StringBuilder buf = super.toStringBuilder();

View File

@ -25,7 +25,6 @@ import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
@ -714,18 +713,8 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
private void startThread() { private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) { if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
schedule(new ScheduledFutureTask<Void>(
this, Executors.<Void>callable(new PurgeTask(), null),
ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
thread.start(); thread.start();
} }
} }
} }
private final class PurgeTask implements Runnable {
@Override
public void run() {
purgeCancelledScheduledTasks();
}
}
} }