diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java new file mode 100644 index 0000000000..ff4028c19d --- /dev/null +++ b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java @@ -0,0 +1,222 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.concurrent; + +import io.netty.util.internal.ObjectUtil; + +import java.util.Iterator; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Abstract base class for {@link EventExecutor}s that want to support scheduling. + */ +public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor { + + Queue> scheduledTaskQueue; + + protected AbstractScheduledEventExecutor() { + } + + protected AbstractScheduledEventExecutor(EventExecutorGroup parent) { + super(parent); + } + + protected static long nanoTime() { + return ScheduledFutureTask.nanoTime(); + } + + Queue> scheduledTaskQueue() { + if (scheduledTaskQueue == null) { + scheduledTaskQueue = new PriorityQueue>(); + } + return scheduledTaskQueue; + } + + private static boolean isNullOrEmpty(Queue> queue) { + return queue == null || queue.isEmpty(); + } + + /** + * Cancel all scheduled tasks. + * + * This method MUST be called only when {@link #inEventLoop()} is {@code true}. + */ + protected void cancelScheduledTasks() { + assert inEventLoop(); + Queue> scheduledTaskQueue = this.scheduledTaskQueue; + if (isNullOrEmpty(scheduledTaskQueue)) { + return; + } + + final ScheduledFutureTask[] scheduledTasks = + scheduledTaskQueue.toArray(new ScheduledFutureTask[scheduledTaskQueue.size()]); + + for (ScheduledFutureTask task: scheduledTasks) { + task.cancel(false); + } + + scheduledTaskQueue.clear(); + } + + /** + * @see {@link #pollScheduledTask(long)} + */ + protected final Runnable pollScheduledTask() { + return pollScheduledTask(nanoTime()); + } + + /** + * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}. + * You should use {@link #nanoTime()} to retrieve the the correct {@code nanoTime}. + */ + protected final Runnable pollScheduledTask(long nanoTime) { + assert inEventLoop(); + + Queue> scheduledTaskQueue = this.scheduledTaskQueue; + ScheduledFutureTask scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); + if (scheduledTask == null) { + return null; + } + + if (scheduledTask.deadlineNanos() <= nanoTime) { + scheduledTaskQueue.remove(); + return scheduledTask; + } + return null; + } + + /** + * Return the nanoseconds when the next scheduled task is ready to be run or {@code -1} if no task is scheduled. + */ + protected final long nextScheduledTaskNano() { + Queue> scheduledTaskQueue = this.scheduledTaskQueue; + ScheduledFutureTask scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); + if (scheduledTask == null) { + return -1; + } + return Math.max(0, scheduledTask.deadlineNanos() - nanoTime()); + } + + final ScheduledFutureTask peekScheduledTask() { + Queue> scheduledTaskQueue = this.scheduledTaskQueue; + if (scheduledTaskQueue == null) { + return null; + } + return scheduledTaskQueue.peek(); + } + + /** + * Returns {@code true} if a scheduled task is ready for processing. + */ + protected final boolean hasScheduledTasks() { + Queue> scheduledTaskQueue = this.scheduledTaskQueue; + ScheduledFutureTask scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); + return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime(); + } + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + ObjectUtil.checkNotNull(command, "command"); + ObjectUtil.checkNotNull(unit, "unit"); + if (delay < 0) { + throw new IllegalArgumentException( + String.format("delay: %d (expected: >= 0)", delay)); + } + return schedule(new ScheduledFutureTask( + this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay)))); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + ObjectUtil.checkNotNull(callable, "callable"); + ObjectUtil.checkNotNull(unit, "unit"); + if (delay < 0) { + throw new IllegalArgumentException( + String.format("delay: %d (expected: >= 0)", delay)); + } + return schedule(new ScheduledFutureTask( + this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay)))); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + ObjectUtil.checkNotNull(command, "command"); + ObjectUtil.checkNotNull(unit, "unit"); + if (initialDelay < 0) { + throw new IllegalArgumentException( + String.format("initialDelay: %d (expected: >= 0)", initialDelay)); + } + if (period <= 0) { + throw new IllegalArgumentException( + String.format("period: %d (expected: > 0)", period)); + } + + return schedule(new ScheduledFutureTask( + this, Executors.callable(command, null), + ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period))); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + ObjectUtil.checkNotNull(command, "command"); + ObjectUtil.checkNotNull(unit, "unit"); + if (initialDelay < 0) { + throw new IllegalArgumentException( + String.format("initialDelay: %d (expected: >= 0)", initialDelay)); + } + if (delay <= 0) { + throw new IllegalArgumentException( + String.format("delay: %d (expected: > 0)", delay)); + } + + return schedule(new ScheduledFutureTask( + this, Executors.callable(command, null), + ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay))); + } + + ScheduledFuture schedule(final ScheduledFutureTask task) { + if (inEventLoop()) { + scheduledTaskQueue().add(task); + } else { + execute(new Runnable() { + @Override + public void run() { + scheduledTaskQueue().add(task); + } + }); + } + + return task; + } + + void purgeCancelledScheduledTasks() { + Queue> scheduledTaskQueue = this.scheduledTaskQueue; + if (isNullOrEmpty(scheduledTaskQueue)) { + return; + } + Iterator> i = scheduledTaskQueue.iterator(); + while (i.hasNext()) { + ScheduledFutureTask task = i.next(); + if (task.isCancelled()) { + i.remove(); + } + } + } +} diff --git a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java index b9227d7913..0c52d530ef 100644 --- a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java @@ -18,11 +18,8 @@ package io.netty.util.concurrent; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; -import java.util.Iterator; -import java.util.PriorityQueue; import java.util.Queue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; @@ -35,7 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean; * task pending in the task queue for 1 second. Please note it is not scalable to schedule large number of tasks to * this executor; use a dedicated executor. */ -public final class GlobalEventExecutor extends AbstractEventExecutor { +public final class GlobalEventExecutor extends AbstractScheduledEventExecutor { private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class); @@ -44,9 +41,8 @@ public final class GlobalEventExecutor extends AbstractEventExecutor { public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor(); final BlockingQueue taskQueue = new LinkedBlockingQueue(); - final Queue> delayedTaskQueue = new PriorityQueue>(); final ScheduledFutureTask purgeTask = new ScheduledFutureTask( - this, delayedTaskQueue, Executors.callable(new PurgeTask(), null), + this, Executors.callable(new PurgeTask(), null), ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL); private final ThreadFactory threadFactory = new DefaultThreadFactory(getClass()); @@ -57,7 +53,7 @@ public final class GlobalEventExecutor extends AbstractEventExecutor { private final Future terminationFuture = new FailedFuture(this, new UnsupportedOperationException()); private GlobalEventExecutor() { - delayedTaskQueue.add(purgeTask); + scheduledTaskQueue().add(purgeTask); } /** @@ -68,8 +64,8 @@ public final class GlobalEventExecutor extends AbstractEventExecutor { Runnable takeTask() { BlockingQueue taskQueue = this.taskQueue; for (;;) { - ScheduledFutureTask delayedTask = delayedTaskQueue.peek(); - if (delayedTask == null) { + ScheduledFutureTask scheduledTask = peekScheduledTask(); + if (scheduledTask == null) { Runnable task = null; try { task = taskQueue.take(); @@ -78,7 +74,7 @@ public final class GlobalEventExecutor extends AbstractEventExecutor { } return task; } else { - long delayNanos = delayedTask.delayNanos(); + long delayNanos = scheduledTask.delayNanos(); Runnable task; if (delayNanos > 0) { try { @@ -92,7 +88,7 @@ public final class GlobalEventExecutor extends AbstractEventExecutor { } if (task == null) { - fetchFromDelayedQueue(); + fetchFromScheduledTaskQueue(); task = taskQueue.poll(); } @@ -103,23 +99,15 @@ public final class GlobalEventExecutor extends AbstractEventExecutor { } } - private void fetchFromDelayedQueue() { - long nanoTime = 0L; - for (;;) { - ScheduledFutureTask delayedTask = delayedTaskQueue.peek(); - if (delayedTask == null) { - break; - } - - if (nanoTime == 0L) { - nanoTime = ScheduledFutureTask.nanoTime(); - } - - if (delayedTask.deadlineNanos() <= nanoTime) { - delayedTaskQueue.remove(); - taskQueue.add(delayedTask); - } else { - break; + private void fetchFromScheduledTaskQueue() { + if (hasScheduledTasks()) { + long nanoTime = AbstractScheduledEventExecutor.nanoTime(); + for (;;) { + Runnable scheduledTask = pollScheduledTask(nanoTime); + if (scheduledTask == null) { + break; + } + taskQueue.add(scheduledTask); } } } @@ -219,103 +207,6 @@ public final class GlobalEventExecutor extends AbstractEventExecutor { } } - // ScheduledExecutorService implementation - - @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - if (command == null) { - throw new NullPointerException("command"); - } - if (unit == null) { - throw new NullPointerException("unit"); - } - if (delay < 0) { - throw new IllegalArgumentException( - String.format("delay: %d (expected: >= 0)", delay)); - } - return schedule(new ScheduledFutureTask( - this, delayedTaskQueue, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay)))); - } - - @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - if (callable == null) { - throw new NullPointerException("callable"); - } - if (unit == null) { - throw new NullPointerException("unit"); - } - if (delay < 0) { - throw new IllegalArgumentException( - String.format("delay: %d (expected: >= 0)", delay)); - } - return schedule(new ScheduledFutureTask( - this, delayedTaskQueue, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay)))); - } - - @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - if (command == null) { - throw new NullPointerException("command"); - } - if (unit == null) { - throw new NullPointerException("unit"); - } - if (initialDelay < 0) { - throw new IllegalArgumentException( - String.format("initialDelay: %d (expected: >= 0)", initialDelay)); - } - if (period <= 0) { - throw new IllegalArgumentException( - String.format("period: %d (expected: > 0)", period)); - } - - return schedule(new ScheduledFutureTask( - this, delayedTaskQueue, Executors.callable(command, null), - ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period))); - } - - @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - if (command == null) { - throw new NullPointerException("command"); - } - if (unit == null) { - throw new NullPointerException("unit"); - } - if (initialDelay < 0) { - throw new IllegalArgumentException( - String.format("initialDelay: %d (expected: >= 0)", initialDelay)); - } - if (delay <= 0) { - throw new IllegalArgumentException( - String.format("delay: %d (expected: > 0)", delay)); - } - - return schedule(new ScheduledFutureTask( - this, delayedTaskQueue, Executors.callable(command, null), - ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay))); - } - - private ScheduledFuture schedule(final ScheduledFutureTask task) { - if (task == null) { - throw new NullPointerException("task"); - } - - if (inEventLoop()) { - delayedTaskQueue.add(task); - } else { - execute(new Runnable() { - @Override - public void run() { - delayedTaskQueue.add(task); - } - }); - } - - return task; - } - private void startThread() { if (started.compareAndSet(false, true)) { Thread t = threadFactory.newThread(taskRunner); @@ -341,8 +232,9 @@ public final class GlobalEventExecutor extends AbstractEventExecutor { } } + Queue> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue; // Terminate if there is no task in the queue (except the purge task). - if (taskQueue.isEmpty() && delayedTaskQueue.size() == 1) { + if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) { // Mark the current thread as stopped. // The following CAS must always success and must be uncontended, // because only one thread should be running at the same time. @@ -350,7 +242,7 @@ public final class GlobalEventExecutor extends AbstractEventExecutor { assert stopped; // Check if there are pending entries added by execute() or schedule*() while we do CAS above. - if (taskQueue.isEmpty() && delayedTaskQueue.size() == 1) { + if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) { // A) No new task was added and thus there's nothing to handle // -> safe to terminate because there's nothing left to do // B) A new thread started and handled all the new tasks. @@ -376,13 +268,7 @@ public final class GlobalEventExecutor extends AbstractEventExecutor { private final class PurgeTask implements Runnable { @Override public void run() { - Iterator> i = delayedTaskQueue.iterator(); - while (i.hasNext()) { - ScheduledFutureTask task = i.next(); - if (task.isCancelled()) { - i.remove(); - } - } + purgeCancelledScheduledTasks(); } } } diff --git a/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java index d6bfb298c2..e605206b6c 100644 --- a/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java +++ b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java @@ -36,37 +36,34 @@ final class ScheduledFutureTask extends PromiseTask implements ScheduledFu } private final long id = nextTaskId.getAndIncrement(); - private final Queue> delayedTaskQueue; private long deadlineNanos; /* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */ private final long periodNanos; ScheduledFutureTask( - EventExecutor executor, Queue> delayedTaskQueue, + AbstractScheduledEventExecutor executor, Runnable runnable, V result, long nanoTime) { - this(executor, delayedTaskQueue, toCallable(runnable, result), nanoTime); + this(executor, toCallable(runnable, result), nanoTime); } ScheduledFutureTask( - EventExecutor executor, Queue> delayedTaskQueue, + AbstractScheduledEventExecutor executor, Callable callable, long nanoTime, long period) { super(executor, callable); if (period == 0) { throw new IllegalArgumentException("period: 0 (expected: != 0)"); } - this.delayedTaskQueue = delayedTaskQueue; deadlineNanos = nanoTime; periodNanos = period; } ScheduledFutureTask( - EventExecutor executor, Queue> delayedTaskQueue, + AbstractScheduledEventExecutor executor, Callable callable, long nanoTime) { super(executor, callable); - this.delayedTaskQueue = delayedTaskQueue; deadlineNanos = nanoTime; periodNanos = 0; } @@ -135,7 +132,11 @@ final class ScheduledFutureTask extends PromiseTask implements ScheduledFu deadlineNanos = nanoTime() - p; } if (!isCancelled()) { - delayedTaskQueue.add(this); + // scheduledTaskQueue can never be null as we lazy init it before submit the task! + Queue> scheduledTaskQueue = + ((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue; + assert scheduledTaskQueue != null; + scheduledTaskQueue.add(this); } } } diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index c5105786c8..88f3207353 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -20,14 +20,11 @@ import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.util.ArrayList; -import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; -import java.util.PriorityQueue; import java.util.Queue; import java.util.Set; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -41,7 +38,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; * Abstract base class for {@link EventExecutor}'s that execute all its submitted tasks in a single thread. * */ -public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { +public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor { private static final InternalLogger logger = InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class); @@ -71,11 +68,11 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } private final Queue taskQueue; - final Queue> delayedTaskQueue = new PriorityQueue>(); private volatile Thread thread; private final Executor executor; private volatile boolean interrupted; + private final Semaphore threadLock = new Semaphore(0); private final Set shutdownHooks = new LinkedHashSet(); private final boolean addTaskWakesUp; @@ -177,8 +174,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { BlockingQueue taskQueue = (BlockingQueue) this.taskQueue; for (;;) { - ScheduledFutureTask delayedTask = delayedTaskQueue.peek(); - if (delayedTask == null) { + ScheduledFutureTask scheduledTask = peekScheduledTask(); + if (scheduledTask == null) { Runnable task = null; try { task = taskQueue.take(); @@ -190,7 +187,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } return task; } else { - long delayNanos = delayedTask.delayNanos(); + long delayNanos = scheduledTask.delayNanos(); Runnable task = null; if (delayNanos > 0) { try { @@ -201,11 +198,11 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } } if (task == null) { - // We need to fetch the delayed tasks now as otherwise there may be a chance that - // delayed tasks are never executed if there is always one task in the taskQueue. + // We need to fetch the scheduled tasks now as otherwise there may be a chance that + // scheduled tasks are never executed if there is always one task in the taskQueue. // This is for example true for the read task of OIO Transport // See https://github.com/netty/netty/issues/1614 - fetchFromDelayedQueue(); + fetchFromScheduledTaskQueue(); task = taskQueue.poll(); } @@ -216,23 +213,15 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } } - private void fetchFromDelayedQueue() { - long nanoTime = 0L; - for (;;) { - ScheduledFutureTask delayedTask = delayedTaskQueue.peek(); - if (delayedTask == null) { - break; - } - - if (nanoTime == 0L) { - nanoTime = ScheduledFutureTask.nanoTime(); - } - - if (delayedTask.deadlineNanos() <= nanoTime) { - delayedTaskQueue.remove(); - taskQueue.add(delayedTask); - } else { - break; + private void fetchFromScheduledTaskQueue() { + if (hasScheduledTasks()) { + long nanoTime = AbstractScheduledEventExecutor.nanoTime(); + for (;;) { + Runnable scheduledTask = pollScheduledTask(nanoTime); + if (scheduledTask == null) { + break; + } + taskQueue.add(scheduledTask); } } } @@ -253,16 +242,6 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { return !taskQueue.isEmpty(); } - /** - * Returns {@code true} if a scheduled task is ready for processing by {@link #runAllTasks()} or - * {@link #runAllTasks(long)}. - */ - protected boolean hasScheduledTasks() { - assert inEventLoop(); - ScheduledFutureTask delayedTask = delayedTaskQueue.peek(); - return delayedTask != null && delayedTask.deadlineNanos() <= ScheduledFutureTask.nanoTime(); - } - /** * Return the number of tasks that are pending for processing. * @@ -303,7 +282,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { * @return {@code true} if and only if at least one task was run */ protected boolean runAllTasks() { - fetchFromDelayedQueue(); + fetchFromScheduledTaskQueue(); Runnable task = pollTask(); if (task == null) { return false; @@ -329,7 +308,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}. */ protected boolean runAllTasks(long timeoutNanos) { - fetchFromDelayedQueue(); + fetchFromScheduledTaskQueue(); Runnable task = pollTask(); if (task == null) { return false; @@ -371,12 +350,12 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { * Returns the amount of time left until the scheduled task with the closest dead line is executed. */ protected long delayNanos(long currentTimeNanos) { - ScheduledFutureTask delayedTask = delayedTaskQueue.peek(); - if (delayedTask == null) { + ScheduledFutureTask scheduledTask = peekScheduledTask(); + if (scheduledTask == null) { return SCHEDULE_PURGE_INTERVAL; } - return delayedTask.delayNanos(currentTimeNanos); + return scheduledTask.delayNanos(currentTimeNanos); } /** @@ -604,7 +583,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { throw new IllegalStateException("must be invoked from an event loop"); } - cancelDelayedTasks(); + cancelScheduledTasks(); if (gracefulShutdownStartTime == 0) { gracefulShutdownStartTime = ScheduledFutureTask.nanoTime(); @@ -645,21 +624,6 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { return true; } - private void cancelDelayedTasks() { - if (delayedTaskQueue.isEmpty()) { - return; - } - - final ScheduledFutureTask[] delayedTasks = - delayedTaskQueue.toArray(new ScheduledFutureTask[delayedTaskQueue.size()]); - - for (ScheduledFutureTask task: delayedTasks) { - task.cancel(false); - } - - delayedTaskQueue.clear(); - } - @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { if (unit == null) { @@ -712,106 +676,11 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1); - @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - if (command == null) { - throw new NullPointerException("command"); - } - if (unit == null) { - throw new NullPointerException("unit"); - } - if (delay < 0) { - throw new IllegalArgumentException( - String.format("delay: %d (expected: >= 0)", delay)); - } - return schedule(new ScheduledFutureTask( - this, delayedTaskQueue, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay)))); - } - - @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - if (callable == null) { - throw new NullPointerException("callable"); - } - if (unit == null) { - throw new NullPointerException("unit"); - } - if (delay < 0) { - throw new IllegalArgumentException( - String.format("delay: %d (expected: >= 0)", delay)); - } - return schedule(new ScheduledFutureTask( - this, delayedTaskQueue, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay)))); - } - - @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - if (command == null) { - throw new NullPointerException("command"); - } - if (unit == null) { - throw new NullPointerException("unit"); - } - if (initialDelay < 0) { - throw new IllegalArgumentException( - String.format("initialDelay: %d (expected: >= 0)", initialDelay)); - } - if (period <= 0) { - throw new IllegalArgumentException( - String.format("period: %d (expected: > 0)", period)); - } - - return schedule(new ScheduledFutureTask( - this, delayedTaskQueue, Executors.callable(command, null), - ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period))); - } - - @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - if (command == null) { - throw new NullPointerException("command"); - } - if (unit == null) { - throw new NullPointerException("unit"); - } - if (initialDelay < 0) { - throw new IllegalArgumentException( - String.format("initialDelay: %d (expected: >= 0)", initialDelay)); - } - if (delay <= 0) { - throw new IllegalArgumentException( - String.format("delay: %d (expected: > 0)", delay)); - } - - return schedule(new ScheduledFutureTask( - this, delayedTaskQueue, Executors.callable(command, null), - ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay))); - } - - private ScheduledFuture schedule(final ScheduledFutureTask task) { - if (task == null) { - throw new NullPointerException("task"); - } - - if (inEventLoop()) { - delayedTaskQueue.add(task); - } else { - execute(new Runnable() { - @Override - public void run() { - delayedTaskQueue.add(task); - } - }); - } - - return task; - } - private void startThread() { if (STATE_UPDATER.get(this) == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { - delayedTaskQueue.add(new ScheduledFutureTask( - this, delayedTaskQueue, Executors.callable(new PurgeTask(), null), + schedule(new ScheduledFutureTask( + this, Executors.callable(new PurgeTask(), null), ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL)); doStartThread(); } @@ -881,13 +750,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { private final class PurgeTask implements Runnable { @Override public void run() { - Iterator> i = delayedTaskQueue.iterator(); - while (i.hasNext()) { - ScheduledFutureTask task = i.next(); - if (task.isCancelled()) { - i.remove(); - } - } + purgeCancelledScheduledTasks(); } } } diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java index a081aabe49..4080816db3 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java @@ -224,18 +224,23 @@ public class EmbeddedChannel extends AbstractChannel { /** * Mark this {@link Channel} as finished. Any futher try to write data to it will fail. * - * * @return bufferReadable returns {@code true} if any of the used buffers has something left to read */ public boolean finish() { close(); runPendingTasks(); + + // Cancel all scheduled tasks that are left. + loop.cancelScheduledTasks(); + checkException(); + return !inboundMessages.isEmpty() || !outboundMessages.isEmpty(); } /** - * Run all tasks that are pending in the {@link EventLoop} for this {@link Channel} + * Run all tasks (which also includes scheduled tasks) that are pending in the {@link EventLoop} + * for this {@link Channel} */ public void runPendingTasks() { try { @@ -243,6 +248,26 @@ public class EmbeddedChannel extends AbstractChannel { } catch (Exception e) { recordException(e); } + + try { + loop.runScheduledTasks(); + } catch (Exception e) { + recordException(e); + } + } + + /** + * Run all pending scheduled tasks in the {@link EventLoop} for this {@link Channel} and return the + * {@code nanoseconds} when the next scheduled task is ready to run. If no other task was scheduled it will return + * {@code -1}. + */ + public long runScheduledPendingTasks() { + try { + return loop.runScheduledTasks(); + } catch (Exception e) { + recordException(e); + return loop.nextScheduledTask(); + } } private void recordException(Throwable cause) { diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java index 21bb2c30ab..f88668ceda 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java @@ -15,7 +15,6 @@ */ package io.netty.channel.embedded; -import io.netty.channel.AbstractEventLoop; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; @@ -23,6 +22,9 @@ import io.netty.channel.ChannelHandlerInvoker; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelPromise; import io.netty.util.concurrent.EventExecutor; +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.AbstractScheduledEventExecutor; import io.netty.util.concurrent.Future; import java.net.SocketAddress; @@ -32,10 +34,20 @@ import java.util.concurrent.TimeUnit; import static io.netty.channel.ChannelHandlerInvokerUtil.*; -final class EmbeddedEventLoop extends AbstractEventLoop implements ChannelHandlerInvoker { +final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements ChannelHandlerInvoker, EventLoop { private final Queue tasks = new ArrayDeque(2); + @Override + public EventLoopGroup parent() { + return (EventLoopGroup) super.parent(); + } + + @Override + public EventLoop next() { + return (EventLoop) super.next(); + } + @Override public void execute(Runnable command) { if (command == null) { @@ -55,6 +67,27 @@ final class EmbeddedEventLoop extends AbstractEventLoop implements ChannelHandle } } + long runScheduledTasks() { + long time = AbstractScheduledEventExecutor.nanoTime(); + for (;;) { + Runnable task = pollScheduledTask(time); + if (task == null) { + return nextScheduledTaskNano(); + } + + task.run(); + } + } + + long nextScheduledTask() { + return nextScheduledTaskNano(); + } + + @Override + protected void cancelScheduledTasks() { + super.cancelScheduledTasks(); + } + @Override public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { throw new UnsupportedOperationException(); diff --git a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java index 68b801ec8b..e32acfd1f7 100644 --- a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java +++ b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java @@ -21,9 +21,15 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.ScheduledFuture; import org.junit.Assert; import org.junit.Test; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + public class EmbeddedChannelTest { @Test @@ -52,4 +58,41 @@ public class EmbeddedChannelTest { Assert.assertSame(second, channel.readInbound()); Assert.assertNull(channel.readInbound()); } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testScheduling() throws Exception { + EmbeddedChannel ch = new EmbeddedChannel(new ChannelInboundHandlerAdapter()); + final CountDownLatch latch = new CountDownLatch(2); + ScheduledFuture future = ch.eventLoop().schedule(new Runnable() { + @Override + public void run() { + latch.countDown(); + } + }, 1, TimeUnit.SECONDS); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + latch.countDown(); + } + }); + long next = ch.runScheduledPendingTasks(); + Assert.assertTrue(next > 0); + // Sleep for the nanoseconds but also give extra 50ms as the clock my not be very precise and so fail the test + // otherwise. + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(next) + 50); + Assert.assertEquals(-1, ch.runScheduledPendingTasks()); + latch.await(); + } + + @Test + public void testScheduledCancelled() throws Exception { + EmbeddedChannel ch = new EmbeddedChannel(new ChannelInboundHandlerAdapter()); + ScheduledFuture future = ch.eventLoop().schedule(new Runnable() { + @Override + public void run() { } + }, 1, TimeUnit.DAYS); + ch.finish(); + Assert.assertTrue(future.isCancelled()); + } }