From 36c80cd81819436cf7b40ae305f7db837aca30d7 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sat, 18 Jul 2015 20:50:34 +0200 Subject: [PATCH] Ensure cancelled scheduled tasks can be GC'ed ASAP Motivation: Prior we used a purge task that would remove previous canceled scheduled tasks from the internal queue. This could introduce some delay and so use a lot of memory even if the task itself is already canceled. Modifications: Schedule removal of task from queue via EventLoop if cancel operation is not done in the EventLoop Thread or just remove directly if the Thread that cancels the scheduled task is in the EventLoop. Result: Faster possibility to GC a canceled ScheduledFutureTask. --- .../AbstractScheduledEventExecutor.java | 27 +++++++++---------- .../util/concurrent/GlobalEventExecutor.java | 25 ++++++++--------- .../util/concurrent/ScheduledFutureTask.java | 13 +++++++++ .../concurrent/SingleThreadEventExecutor.java | 11 -------- 4 files changed, 37 insertions(+), 39 deletions(-) diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java index 2953398769..e5a5a8350d 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java @@ -16,8 +16,8 @@ package io.netty.util.concurrent; import io.netty.util.internal.ObjectUtil; +import io.netty.util.internal.OneTimeTask; -import java.util.Iterator; import java.util.PriorityQueue; import java.util.Queue; import java.util.concurrent.Callable; @@ -62,7 +62,7 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut scheduledTaskQueue.toArray(new ScheduledFutureTask[scheduledTaskQueue.size()]); for (ScheduledFutureTask task: scheduledTasks) { - task.cancel(false); + task.cancelWithoutRemove(false); } scheduledTaskQueue.clear(); @@ -188,7 +188,7 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut if (inEventLoop()) { scheduledTaskQueue().add(task); } else { - execute(new Runnable() { + execute(new OneTimeTask() { @Override public void run() { scheduledTaskQueue().add(task); @@ -199,17 +199,16 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut return task; } - void purgeCancelledScheduledTasks() { - Queue> scheduledTaskQueue = this.scheduledTaskQueue; - if (isNullOrEmpty(scheduledTaskQueue)) { - return; - } - Iterator> i = scheduledTaskQueue.iterator(); - while (i.hasNext()) { - ScheduledFutureTask task = i.next(); - if (task.isCancelled()) { - i.remove(); - } + final void removeScheduled(final ScheduledFutureTask task) { + if (inEventLoop()) { + scheduledTaskQueue().remove(task); + } else { + execute(new OneTimeTask() { + @Override + public void run() { + removeScheduled(task); + } + }); } } } diff --git a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java index eea6d45aef..5f18ed8dda 100644 --- a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java @@ -36,14 +36,18 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor { private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class); - private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1); + private static final long SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(1); public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor(); final BlockingQueue taskQueue = new LinkedBlockingQueue(); - final ScheduledFutureTask purgeTask = new ScheduledFutureTask( - this, Executors.callable(new PurgeTask(), null), - ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL); + final ScheduledFutureTask quietPeriodTask = new ScheduledFutureTask( + this, Executors.callable(new Runnable() { + @Override + public void run() { + // NOOP + } + }, null), ScheduledFutureTask.deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL); private final ThreadFactory threadFactory = new DefaultThreadFactory(getClass()); private final TaskRunner taskRunner = new TaskRunner(); @@ -53,7 +57,7 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor { private final Future terminationFuture = new FailedFuture(this, new UnsupportedOperationException()); private GlobalEventExecutor() { - scheduledTaskQueue().add(purgeTask); + scheduledTaskQueue().add(quietPeriodTask); } @Override @@ -231,13 +235,13 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor { logger.warn("Unexpected exception from the global event executor: ", t); } - if (task != purgeTask) { + if (task != quietPeriodTask) { continue; } } Queue> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue; - // Terminate if there is no task in the queue (except the purge task). + // Terminate if there is no task in the queue (except the noop task). if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) { // Mark the current thread as stopped. // The following CAS must always success and must be uncontended, @@ -268,11 +272,4 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor { } } } - - private final class PurgeTask implements Runnable { - @Override - public void run() { - purgeCancelledScheduledTasks(); - } - } } diff --git a/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java index e605206b6c..a7a3533d34 100644 --- a/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java +++ b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java @@ -146,6 +146,19 @@ final class ScheduledFutureTask extends PromiseTask implements ScheduledFu } } + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + boolean canceled = super.cancel(mayInterruptIfRunning); + if (canceled) { + ((AbstractScheduledEventExecutor) executor()).removeScheduled(this); + } + return canceled; + } + + boolean cancelWithoutRemove(boolean mayInterruptIfRunning) { + return super.cancel(mayInterruptIfRunning); + } + @Override protected StringBuilder toStringBuilder() { StringBuilder buf = super.toStringBuilder(); diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index 03f1fb183a..8e8e9d1139 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Queue; import java.util.Set; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; @@ -714,18 +713,8 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx private void startThread() { if (STATE_UPDATER.get(this) == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { - schedule(new ScheduledFutureTask( - this, Executors.callable(new PurgeTask(), null), - ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL)); thread.start(); } } } - - private final class PurgeTask implements Runnable { - @Override - public void run() { - purgeCancelledScheduledTasks(); - } - } }