Ensure cancelled scheduled tasks can be GC'ed ASAP
Motivation: Prior we used a purge task that would remove previous canceled scheduled tasks from the internal queue. This could introduce some delay and so use a lot of memory even if the task itself is already canceled. Modifications: Schedule removal of task from queue via EventLoop if cancel operation is not done in the EventLoop Thread or just remove directly if the Thread that cancels the scheduled task is in the EventLoop. Result: Faster possibility to GC a canceled ScheduledFutureTask.
This commit is contained in:
parent
d1344345bb
commit
05dae57ad7
@ -16,8 +16,8 @@
|
||||
package io.netty.util.concurrent;
|
||||
|
||||
import io.netty.util.internal.ObjectUtil;
|
||||
import io.netty.util.internal.OneTimeTask;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.PriorityQueue;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.Callable;
|
||||
@ -69,7 +69,7 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
|
||||
scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[scheduledTaskQueue.size()]);
|
||||
|
||||
for (ScheduledFutureTask<?> task: scheduledTasks) {
|
||||
task.cancel(false);
|
||||
task.cancelWithoutRemove(false);
|
||||
}
|
||||
|
||||
scheduledTaskQueue.clear();
|
||||
@ -195,7 +195,7 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
|
||||
if (inEventLoop()) {
|
||||
scheduledTaskQueue().add(task);
|
||||
} else {
|
||||
execute(new Runnable() {
|
||||
execute(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
scheduledTaskQueue().add(task);
|
||||
@ -206,17 +206,16 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
|
||||
return task;
|
||||
}
|
||||
|
||||
void purgeCancelledScheduledTasks() {
|
||||
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
|
||||
if (isNullOrEmpty(scheduledTaskQueue)) {
|
||||
return;
|
||||
}
|
||||
Iterator<ScheduledFutureTask<?>> i = scheduledTaskQueue.iterator();
|
||||
while (i.hasNext()) {
|
||||
ScheduledFutureTask<?> task = i.next();
|
||||
if (task.isCancelled()) {
|
||||
i.remove();
|
||||
}
|
||||
final void removeScheduled(final ScheduledFutureTask<?> task) {
|
||||
if (inEventLoop()) {
|
||||
scheduledTaskQueue().remove(task);
|
||||
} else {
|
||||
execute(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
removeScheduled(task);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -36,14 +36,18 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor {
|
||||
|
||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class);
|
||||
|
||||
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
|
||||
private static final long SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(1);
|
||||
|
||||
public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor();
|
||||
|
||||
final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
|
||||
final ScheduledFutureTask<Void> purgeTask = new ScheduledFutureTask<Void>(
|
||||
this, Executors.<Void>callable(new PurgeTask(), null),
|
||||
ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL);
|
||||
final ScheduledFutureTask<Void> quietPeriodTask = new ScheduledFutureTask<Void>(
|
||||
this, Executors.<Void>callable(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// NOOP
|
||||
}
|
||||
}, null), ScheduledFutureTask.deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL);
|
||||
|
||||
private final ThreadFactory threadFactory = new DefaultThreadFactory(getClass());
|
||||
private final TaskRunner taskRunner = new TaskRunner();
|
||||
@ -53,7 +57,7 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor {
|
||||
private final Future<?> terminationFuture = new FailedFuture<Object>(this, new UnsupportedOperationException());
|
||||
|
||||
private GlobalEventExecutor() {
|
||||
scheduledTaskQueue().add(purgeTask);
|
||||
scheduledTaskQueue().add(quietPeriodTask);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -227,13 +231,13 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor {
|
||||
logger.warn("Unexpected exception from the global event executor: ", t);
|
||||
}
|
||||
|
||||
if (task != purgeTask) {
|
||||
if (task != quietPeriodTask) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue;
|
||||
// Terminate if there is no task in the queue (except the purge task).
|
||||
// Terminate if there is no task in the queue (except the noop task).
|
||||
if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
|
||||
// Mark the current thread as stopped.
|
||||
// The following CAS must always success and must be uncontended,
|
||||
@ -264,11 +268,4 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final class PurgeTask implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
purgeCancelledScheduledTasks();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -146,6 +146,19 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean cancel(boolean mayInterruptIfRunning) {
|
||||
boolean canceled = super.cancel(mayInterruptIfRunning);
|
||||
if (canceled) {
|
||||
((AbstractScheduledEventExecutor) executor()).removeScheduled(this);
|
||||
}
|
||||
return canceled;
|
||||
}
|
||||
|
||||
boolean cancelWithoutRemove(boolean mayInterruptIfRunning) {
|
||||
return super.cancel(mayInterruptIfRunning);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected StringBuilder toStringBuilder() {
|
||||
StringBuilder buf = super.toStringBuilder();
|
||||
|
@ -26,7 +26,6 @@ import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.Semaphore;
|
||||
@ -679,9 +678,6 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
|
||||
private void startThread() {
|
||||
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
|
||||
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
|
||||
schedule(new ScheduledFutureTask<Void>(
|
||||
this, Executors.<Void>callable(new PurgeTask(), null),
|
||||
ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
|
||||
doStartThread();
|
||||
}
|
||||
}
|
||||
@ -746,11 +742,4 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private final class PurgeTask implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
purgeCancelledScheduledTasks();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user