Allow to use EmbeddedChannel.schedule*(...)

Motivation:

At the moment when EmbeddedChannel is used and a ChannelHandler tries to schedule and task it will throw an UnsupportedOperationException. This makes it impossible to test these handlers or even reuse them with EmbeddedChannel.

Modifications:

- Factor out reusable scheduling code into AbstractSchedulingEventExecutor
- Let EmbeddedEventLoop and SingleThreadEventExecutor extend AbstractSchedulingEventExecutor
- add EmbbededChannel.runScheduledPendingTasks() which allows to run all scheduled tasks that are ready

Result:

Embeddedchannel is now usable even with ChannelHandler that try to schedule tasks.
This commit is contained in:
Norman Maurer 2015-01-30 14:04:06 +01:00
parent 6d822195b0
commit 590c649e7d
7 changed files with 432 additions and 351 deletions

View File

@ -0,0 +1,257 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import io.netty.util.internal.CallableEventExecutorAdapter;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.RunnableEventExecutorAdapter;
import java.util.Iterator;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Abstract base class for {@link EventExecutor}s that want to support scheduling.
*/
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue;
protected AbstractScheduledEventExecutor() {
}
protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
super(parent);
}
protected static long nanoTime() {
return ScheduledFutureTask.nanoTime();
}
Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
}
return scheduledTaskQueue;
}
private static boolean isNullOrEmpty(Queue<ScheduledFutureTask<?>> queue) {
return queue == null || queue.isEmpty();
}
/**
* Cancel all scheduled tasks.
*
* This method MUST be called only when {@link #inEventLoop()} is {@code true}.
*/
protected void cancelScheduledTasks() {
assert inEventLoop();
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
if (isNullOrEmpty(scheduledTaskQueue)) {
return;
}
final ScheduledFutureTask<?>[] scheduledTasks =
scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[scheduledTaskQueue.size()]);
for (ScheduledFutureTask<?> task: scheduledTasks) {
task.cancel(false);
}
scheduledTaskQueue.clear();
}
/**
* @see {@link #pollScheduledTask(long)}
*/
protected final Runnable pollScheduledTask() {
return pollScheduledTask(nanoTime());
}
/**
* Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}.
* You should use {@link #nanoTime()} to retrieve the the correct {@code nanoTime}.
*/
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return null;
}
if (scheduledTask.deadlineNanos() <= nanoTime) {
scheduledTaskQueue.remove();
return scheduledTask;
}
return null;
}
/**
* Return the nanoseconds when the next scheduled task is ready to be run or {@code -1} if no task is scheduled.
*/
protected final long nextScheduledTaskNano() {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return -1;
}
return Math.max(0, scheduledTask.deadlineNanos() - nanoTime());
}
final ScheduledFutureTask<?> peekScheduledTask() {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
if (scheduledTaskQueue == null) {
return null;
}
return scheduledTaskQueue.peek();
}
/**
* Returns {@code true} if a scheduled task is ready for processing.
*/
protected final boolean hasScheduledTasks() {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime();
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(unit, "unit");
if (delay < 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: >= 0)", delay));
}
return schedule(new ScheduledFutureTask<Void>(
this, toCallable(command), ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
ObjectUtil.checkNotNull(callable, "callable");
ObjectUtil.checkNotNull(unit, "unit");
if (delay < 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: >= 0)", delay));
}
return schedule(new ScheduledFutureTask<V>(
this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(unit, "unit");
if (initialDelay < 0) {
throw new IllegalArgumentException(
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
}
if (period <= 0) {
throw new IllegalArgumentException(
String.format("period: %d (expected: > 0)", period));
}
return schedule(new ScheduledFutureTask<Void>(
this, toCallable(command),
ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(unit, "unit");
if (initialDelay < 0) {
throw new IllegalArgumentException(
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
}
if (delay <= 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: > 0)", delay));
}
return schedule(new ScheduledFutureTask<Void>(
this, toCallable(command),
ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
}
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduledTaskQueue().add(task);
} else {
execute(new Runnable() {
@Override
public void run() {
scheduledTaskQueue().add(task);
}
});
}
return task;
}
void purgeCancelledScheduledTasks() {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
if (isNullOrEmpty(scheduledTaskQueue)) {
return;
}
Iterator<ScheduledFutureTask<?>> i = scheduledTaskQueue.iterator();
while (i.hasNext()) {
ScheduledFutureTask<?> task = i.next();
if (task.isCancelled()) {
i.remove();
}
}
}
private static Callable<Void> toCallable(final Runnable command) {
if (command instanceof RunnableEventExecutorAdapter) {
return new RunnableToCallableAdapter((RunnableEventExecutorAdapter) command);
} else {
return Executors.callable(command, null);
}
}
private static class RunnableToCallableAdapter implements CallableEventExecutorAdapter<Void> {
final RunnableEventExecutorAdapter runnable;
RunnableToCallableAdapter(RunnableEventExecutorAdapter runnable) {
this.runnable = runnable;
}
@Override
public EventExecutor executor() {
return runnable.executor();
}
@Override
public Callable<Void> unwrap() {
return null;
}
@Override
public Void call() throws Exception {
runnable.run();
return null;
}
}
}

View File

@ -18,11 +18,8 @@ package io.netty.util.concurrent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.Iterator;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
@ -35,7 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
* task pending in the task queue for 1 second. Please note it is not scalable to schedule large number of tasks to
* this executor; use a dedicated executor.
*/
public final class GlobalEventExecutor extends AbstractEventExecutor {
public final class GlobalEventExecutor extends AbstractScheduledEventExecutor {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class);
@ -44,9 +41,8 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor();
final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
final Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
final ScheduledFutureTask<Void> purgeTask = new ScheduledFutureTask<Void>(
this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null),
this, Executors.<Void>callable(new PurgeTask(), null),
ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL);
private final ThreadFactory threadFactory = new DefaultThreadFactory(getClass());
@ -57,7 +53,7 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
private final Future<?> terminationFuture = new FailedFuture<Object>(this, new UnsupportedOperationException());
private GlobalEventExecutor() {
delayedTaskQueue.add(purgeTask);
scheduledTaskQueue().add(purgeTask);
}
/**
@ -68,8 +64,8 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
Runnable takeTask() {
BlockingQueue<Runnable> taskQueue = this.taskQueue;
for (;;) {
ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
if (delayedTask == null) {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
Runnable task = null;
try {
task = taskQueue.take();
@ -78,7 +74,7 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
}
return task;
} else {
long delayNanos = delayedTask.delayNanos();
long delayNanos = scheduledTask.delayNanos();
Runnable task;
if (delayNanos > 0) {
try {
@ -92,7 +88,7 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
}
if (task == null) {
fetchFromDelayedQueue();
fetchFromScheduledTaskQueue();
task = taskQueue.poll();
}
@ -103,23 +99,15 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
}
}
private void fetchFromDelayedQueue() {
long nanoTime = 0L;
for (;;) {
ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
if (delayedTask == null) {
break;
}
if (nanoTime == 0L) {
nanoTime = ScheduledFutureTask.nanoTime();
}
if (delayedTask.deadlineNanos() <= nanoTime) {
delayedTaskQueue.remove();
taskQueue.add(delayedTask);
} else {
break;
private void fetchFromScheduledTaskQueue() {
if (hasScheduledTasks()) {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
for (;;) {
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
break;
}
taskQueue.add(scheduledTask);
}
}
}
@ -219,103 +207,6 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
}
}
// ScheduledExecutorService implementation
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (command == null) {
throw new NullPointerException("command");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (delay < 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: >= 0)", delay));
}
return schedule(new ScheduledFutureTask<Void>(this, delayedTaskQueue,
Executors.<Void>callable(command, null), ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
if (callable == null) {
throw new NullPointerException("callable");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (delay < 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: >= 0)", delay));
}
return schedule(new ScheduledFutureTask<V>(
this, delayedTaskQueue, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
if (command == null) {
throw new NullPointerException("command");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (initialDelay < 0) {
throw new IllegalArgumentException(
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
}
if (period <= 0) {
throw new IllegalArgumentException(
String.format("period: %d (expected: > 0)", period));
}
return schedule(new ScheduledFutureTask<Void>(
this, delayedTaskQueue, Executors.<Void>callable(command, null),
ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
if (command == null) {
throw new NullPointerException("command");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (initialDelay < 0) {
throw new IllegalArgumentException(
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
}
if (delay <= 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: > 0)", delay));
}
return schedule(new ScheduledFutureTask<Void>(
this, delayedTaskQueue, Executors.<Void>callable(command, null),
ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
}
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (task == null) {
throw new NullPointerException("task");
}
if (inEventLoop()) {
delayedTaskQueue.add(task);
} else {
execute(new Runnable() {
@Override
public void run() {
delayedTaskQueue.add(task);
}
});
}
return task;
}
private void startThread() {
if (started.compareAndSet(false, true)) {
Thread t = threadFactory.newThread(taskRunner);
@ -341,8 +232,9 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
}
}
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue;
// Terminate if there is no task in the queue (except the purge task).
if (taskQueue.isEmpty() && delayedTaskQueue.size() == 1) {
if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
// Mark the current thread as stopped.
// The following CAS must always success and must be uncontended,
// because only one thread should be running at the same time.
@ -350,7 +242,7 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
assert stopped;
// Check if there are pending entries added by execute() or schedule*() while we do CAS above.
if (taskQueue.isEmpty() && delayedTaskQueue.size() == 1) {
if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
// A) No new task was added and thus there's nothing to handle
// -> safe to terminate because there's nothing left to do
// B) A new thread started and handled all the new tasks.
@ -376,13 +268,7 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
private final class PurgeTask implements Runnable {
@Override
public void run() {
Iterator<ScheduledFutureTask<?>> i = delayedTaskQueue.iterator();
while (i.hasNext()) {
ScheduledFutureTask<?> task = i.next();
if (task.isCancelled()) {
i.remove();
}
}
purgeCancelledScheduledTasks();
}
}
}

View File

@ -39,27 +39,24 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
}
private final long id = nextTaskId.getAndIncrement();
private volatile Queue<ScheduledFutureTask<?>> delayedTaskQueue;
private long deadlineNanos;
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
private final long periodNanos;
ScheduledFutureTask(EventExecutor executor, Queue<ScheduledFutureTask<?>> delayedTaskQueue,
ScheduledFutureTask(EventExecutor executor,
Callable<V> callable, long nanoTime, long period) {
super(executor.unwrap(), callable);
if (period == 0) {
throw new IllegalArgumentException("period: 0 (expected: != 0)");
}
this.delayedTaskQueue = delayedTaskQueue;
deadlineNanos = nanoTime;
periodNanos = period;
}
ScheduledFutureTask(EventExecutor executor, Queue<ScheduledFutureTask<?>> delayedTaskQueue,
ScheduledFutureTask(EventExecutor executor,
Callable<V> callable, long nanoTime) {
super(executor.unwrap(), callable);
this.delayedTaskQueue = delayedTaskQueue;
deadlineNanos = nanoTime;
periodNanos = 0;
}
@ -114,7 +111,11 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
// Try again in ten microseconds.
deadlineNanos = nanoTime() + TimeUnit.MICROSECONDS.toNanos(10);
if (!isCancelled()) {
delayedTaskQueue.add(this);
// scheduledTaskQueue can never be null as we lazy init it before submit the task!
Queue<ScheduledFutureTask<?>> scheduledTaskQueue =
((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;
assert scheduledTaskQueue != null;
scheduledTaskQueue.add(this);
}
}
} else {
@ -126,19 +127,22 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
}
// periodically executed tasks
} else {
// check if is done as it may was cancelled
if (!isCancelled()) {
task.call();
if (!executor().isShutdown()) {
// repeat task at fixed rate
if (periodNanos > 0) {
deadlineNanos += periodNanos;
// repeat task with fixed delay
long p = periodNanos;
if (p > 0) {
deadlineNanos += p;
} else {
deadlineNanos = nanoTime() - periodNanos;
deadlineNanos = nanoTime() - p;
}
if (!isCancelled()) {
delayedTaskQueue.add(this);
// scheduledTaskQueue can never be null as we lazy init it before submit the task!
Queue<ScheduledFutureTask<?>> scheduledTaskQueue =
((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;
assert scheduledTaskQueue != null;
scheduledTaskQueue.add(this);
}
}
}
@ -189,7 +193,8 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
if (newExecutor instanceof SingleThreadEventExecutor) {
if (!newExecutor.isShutdown()) {
executor = newExecutor;
delayedTaskQueue = ((SingleThreadEventExecutor) newExecutor).delayedTaskQueue;
final Queue<ScheduledFutureTask<?>> scheduledTaskQueue
= ((SingleThreadEventExecutor) newExecutor).scheduledTaskQueue();
executor.execute(new OneTimeTask() {
@Override
@ -197,7 +202,7 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
// Execute as soon as possible.
deadlineNanos = nanoTime();
if (!isCancelled()) {
delayedTaskQueue.add(ScheduledFutureTask.this);
scheduledTaskQueue.add(ScheduledFutureTask.this);
}
}
});

View File

@ -15,21 +15,16 @@
*/
package io.netty.util.concurrent;
import io.netty.util.internal.CallableEventExecutorAdapter;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.RunnableEventExecutorAdapter;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
@ -43,7 +38,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
* Abstract base class for {@link EventExecutor}'s that execute all its submitted tasks in a single thread.
*
*/
public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
@ -82,7 +77,6 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
}
private final Queue<Runnable> taskQueue;
final Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
@SuppressWarnings({ "FieldMayBeFinal", "unused" })
private volatile Thread thread;
@ -186,8 +180,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
for (;;) {
ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
if (delayedTask == null) {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
Runnable task = null;
try {
task = taskQueue.take();
@ -199,7 +193,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
}
return task;
} else {
long delayNanos = delayedTask.delayNanos();
long delayNanos = scheduledTask.delayNanos();
Runnable task = null;
if (delayNanos > 0) {
try {
@ -210,11 +204,11 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
}
}
if (task == null) {
// We need to fetch the delayed tasks now as otherwise there may be a chance that
// delayed tasks are never executed if there is always one task in the taskQueue.
// We need to fetch the scheduled tasks now as otherwise there may be a chance that
// scheduled tasks are never executed if there is always one task in the taskQueue.
// This is for example true for the read task of OIO Transport
// See https://github.com/netty/netty/issues/1614
fetchFromDelayedQueue();
fetchFromScheduledTaskQueue();
task = taskQueue.poll();
}
@ -225,23 +219,15 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
}
}
private void fetchFromDelayedQueue() {
long nanoTime = 0L;
for (;;) {
ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
if (delayedTask == null) {
break;
}
if (nanoTime == 0L) {
nanoTime = ScheduledFutureTask.nanoTime();
}
if (delayedTask.deadlineNanos() <= nanoTime) {
delayedTaskQueue.remove();
taskQueue.add(delayedTask);
} else {
break;
private void fetchFromScheduledTaskQueue() {
if (hasScheduledTasks()) {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
for (;;) {
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
break;
}
taskQueue.add(scheduledTask);
}
}
}
@ -262,16 +248,6 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
return !taskQueue.isEmpty();
}
/**
* Returns {@code true} if a scheduled task is ready for processing by {@link #runAllTasks()} or
* {@link #runAllTasks(long)}.
*/
protected boolean hasScheduledTasks() {
assert inEventLoop();
ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
return delayedTask != null && delayedTask.deadlineNanos() <= ScheduledFutureTask.nanoTime();
}
/**
* Return the number of tasks that are pending for processing.
*
@ -312,7 +288,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
* @return {@code true} if and only if at least one task was run
*/
protected boolean runAllTasks() {
fetchFromDelayedQueue();
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
return false;
@ -338,7 +314,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
* the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
*/
protected boolean runAllTasks(long timeoutNanos) {
fetchFromDelayedQueue();
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
return false;
@ -380,12 +356,12 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
* Returns the amount of time left until the scheduled task with the closest dead line is executed.
*/
protected long delayNanos(long currentTimeNanos) {
ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
if (delayedTask == null) {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
return SCHEDULE_PURGE_INTERVAL;
}
return delayedTask.delayNanos(currentTimeNanos);
return scheduledTask.delayNanos(currentTimeNanos);
}
/**
@ -613,7 +589,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
throw new IllegalStateException("must be invoked from an event loop");
}
cancelDelayedTasks();
cancelScheduledTasks();
if (gracefulShutdownStartTime == 0) {
gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
@ -654,21 +630,6 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
return true;
}
private void cancelDelayedTasks() {
if (delayedTaskQueue.isEmpty()) {
return;
}
final ScheduledFutureTask<?>[] delayedTasks =
delayedTaskQueue.toArray(new ScheduledFutureTask<?>[delayedTaskQueue.size()]);
for (ScheduledFutureTask<?> task: delayedTasks) {
task.cancel(false);
}
delayedTaskQueue.clear();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
if (unit == null) {
@ -721,109 +682,6 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (command == null) {
throw new NullPointerException("command");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (delay < 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: >= 0)", delay));
}
return schedule(new ScheduledFutureTask<Void>(
this, delayedTaskQueue, toCallable(command), ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
if (callable == null) {
throw new NullPointerException("callable");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (delay < 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: >= 0)", delay));
}
return schedule(new ScheduledFutureTask<V>(
this, delayedTaskQueue, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
if (command == null) {
throw new NullPointerException("command");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (initialDelay < 0) {
throw new IllegalArgumentException(
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
}
if (period <= 0) {
throw new IllegalArgumentException(
String.format("period: %d (expected: > 0)", period));
}
return schedule(new ScheduledFutureTask<Void>(
this, delayedTaskQueue, toCallable(command),
ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
if (command == null) {
throw new NullPointerException("command");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (initialDelay < 0) {
throw new IllegalArgumentException(
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
}
if (delay <= 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: > 0)", delay));
}
return schedule(new ScheduledFutureTask<Void>(
this, delayedTaskQueue, toCallable(command),
ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
}
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (task == null) {
throw new NullPointerException("task");
}
if (inEventLoop()) {
delayedTaskQueue.add(task);
} else {
execute(new Runnable() {
@Override
public void run() {
delayedTaskQueue.add(task);
}
});
}
return task;
}
private static Callable<Void> toCallable(final Runnable command) {
if (command instanceof RunnableEventExecutorAdapter) {
return new RunnableToCallableAdapter((RunnableEventExecutorAdapter) command);
} else {
return Executors.callable(command, null);
}
}
protected void cleanupAndTerminate(boolean success) {
for (;;) {
int oldState = STATE_UPDATER.get(this);
@ -868,8 +726,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
private void startExecution() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
delayedTaskQueue.add(new ScheduledFutureTask<Void>(
this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null),
schedule(new ScheduledFutureTask<Void>(
this, Executors.<Void>callable(new PurgeTask(), null),
ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
scheduleExecution();
}
@ -888,38 +746,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
private final class PurgeTask implements Runnable {
@Override
public void run() {
Iterator<ScheduledFutureTask<?>> i = delayedTaskQueue.iterator();
while (i.hasNext()) {
ScheduledFutureTask<?> task = i.next();
if (task.isCancelled()) {
i.remove();
}
}
}
}
private static class RunnableToCallableAdapter implements CallableEventExecutorAdapter<Void> {
final RunnableEventExecutorAdapter runnable;
RunnableToCallableAdapter(RunnableEventExecutorAdapter runnable) {
this.runnable = runnable;
}
@Override
public EventExecutor executor() {
return runnable.executor();
}
@Override
public Callable<Void> unwrap() {
return null;
}
@Override
public Void call() throws Exception {
runnable.run();
return null;
purgeCancelledScheduledTasks();
}
}
}

View File

@ -224,18 +224,23 @@ public class EmbeddedChannel extends AbstractChannel {
/**
* Mark this {@link Channel} as finished. Any futher try to write data to it will fail.
*
*
* @return bufferReadable returns {@code true} if any of the used buffers has something left to read
*/
public boolean finish() {
close();
runPendingTasks();
// Cancel all scheduled tasks that are left.
loop.cancelScheduledTasks();
checkException();
return !inboundMessages.isEmpty() || !outboundMessages.isEmpty();
}
/**
* Run all tasks that are pending in the {@link EventLoop} for this {@link Channel}
* Run all tasks (which also includes scheduled tasks) that are pending in the {@link EventLoop}
* for this {@link Channel}
*/
public void runPendingTasks() {
try {
@ -243,6 +248,26 @@ public class EmbeddedChannel extends AbstractChannel {
} catch (Exception e) {
recordException(e);
}
try {
loop.runScheduledTasks();
} catch (Exception e) {
recordException(e);
}
}
/**
* Run all pending scheduled tasks in the {@link EventLoop} for this {@link Channel} and return the
* {@code nanoseconds} when the next scheduled task is ready to run. If no other task was scheduled it will return
* {@code -1}.
*/
public long runScheduledPendingTasks() {
try {
return loop.runScheduledTasks();
} catch (Exception e) {
recordException(e);
return loop.nextScheduledTask();
}
}
private void recordException(Throwable cause) {

View File

@ -15,7 +15,6 @@
*/
package io.netty.channel.embedded;
import io.netty.channel.AbstractEventLoop;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
@ -23,6 +22,9 @@ import io.netty.channel.ChannelHandlerInvoker;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.AbstractScheduledEventExecutor;
import io.netty.util.concurrent.Future;
import java.net.SocketAddress;
@ -32,10 +34,25 @@ import java.util.concurrent.TimeUnit;
import static io.netty.channel.ChannelHandlerInvokerUtil.*;
final class EmbeddedEventLoop extends AbstractEventLoop implements ChannelHandlerInvoker {
final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements ChannelHandlerInvoker, EventLoop {
private final Queue<Runnable> tasks = new ArrayDeque<Runnable>(2);
@Override
public EventLoop unwrap() {
return this;
}
@Override
public EventLoopGroup parent() {
return (EventLoopGroup) super.parent();
}
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
@Override
public void execute(Runnable command) {
if (command == null) {
@ -55,6 +72,27 @@ final class EmbeddedEventLoop extends AbstractEventLoop implements ChannelHandle
}
}
long runScheduledTasks() {
long time = AbstractScheduledEventExecutor.nanoTime();
for (;;) {
Runnable task = pollScheduledTask(time);
if (task == null) {
return nextScheduledTaskNano();
}
task.run();
}
}
long nextScheduledTask() {
return nextScheduledTaskNano();
}
@Override
protected void cancelScheduledTasks() {
super.cancelScheduledTasks();
}
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
throw new UnsupportedOperationException();

View File

@ -21,9 +21,15 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ScheduledFuture;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class EmbeddedChannelTest {
@Test
@ -52,4 +58,41 @@ public class EmbeddedChannelTest {
Assert.assertSame(second, channel.readInbound());
Assert.assertNull(channel.readInbound());
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testScheduling() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new ChannelInboundHandlerAdapter());
final CountDownLatch latch = new CountDownLatch(2);
ScheduledFuture future = ch.eventLoop().schedule(new Runnable() {
@Override
public void run() {
latch.countDown();
}
}, 1, TimeUnit.SECONDS);
future.addListener(new FutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
latch.countDown();
}
});
long next = ch.runScheduledPendingTasks();
Assert.assertTrue(next > 0);
// Sleep for the nanoseconds but also give extra 50ms as the clock my not be very precise and so fail the test
// otherwise.
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(next) + 50);
Assert.assertEquals(-1, ch.runScheduledPendingTasks());
latch.await();
}
@Test
public void testScheduledCancelled() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new ChannelInboundHandlerAdapter());
ScheduledFuture<?> future = ch.eventLoop().schedule(new Runnable() {
@Override
public void run() { }
}, 1, TimeUnit.DAYS);
ch.finish();
Assert.assertTrue(future.isCancelled());
}
}