Allow to use EmbeddedChannel.schedule*(...)
Motivation: At the moment when EmbeddedChannel is used and a ChannelHandler tries to schedule and task it will throw an UnsupportedOperationException. This makes it impossible to test these handlers or even reuse them with EmbeddedChannel. Modifications: - Factor out reusable scheduling code into AbstractSchedulingEventExecutor - Let EmbeddedEventLoop and SingleThreadEventExecutor extend AbstractSchedulingEventExecutor - add EmbbededChannel.runScheduledPendingTasks() which allows to run all scheduled tasks that are ready Result: Embeddedchannel is now usable even with ChannelHandler that try to schedule tasks.
This commit is contained in:
parent
a2f77e743e
commit
6950bfb5d8
@ -0,0 +1,215 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2015 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.util.concurrent;
|
||||||
|
|
||||||
|
import io.netty.util.internal.ObjectUtil;
|
||||||
|
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.PriorityQueue;
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Abstract base class for {@link EventExecutor}s that want to support scheduling.
|
||||||
|
*/
|
||||||
|
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
|
||||||
|
|
||||||
|
Queue<ScheduledFutureTask<?>> scheduledTaskQueue;
|
||||||
|
|
||||||
|
protected static long nanoTime() {
|
||||||
|
return ScheduledFutureTask.nanoTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
|
||||||
|
if (scheduledTaskQueue == null) {
|
||||||
|
scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
|
||||||
|
}
|
||||||
|
return scheduledTaskQueue;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean isNullOrEmpty(Queue<ScheduledFutureTask<?>> queue) {
|
||||||
|
return queue == null || queue.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cancel all scheduled tasks.
|
||||||
|
*
|
||||||
|
* This method MUST be called only when {@link #inEventLoop()} is {@code true}.
|
||||||
|
*/
|
||||||
|
protected void cancelScheduledTasks() {
|
||||||
|
assert inEventLoop();
|
||||||
|
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
|
||||||
|
if (isNullOrEmpty(scheduledTaskQueue)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final ScheduledFutureTask<?>[] scheduledTasks =
|
||||||
|
scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[scheduledTaskQueue.size()]);
|
||||||
|
|
||||||
|
for (ScheduledFutureTask<?> task: scheduledTasks) {
|
||||||
|
task.cancel(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
scheduledTaskQueue.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @see {@link #pollScheduledTask(long)}
|
||||||
|
*/
|
||||||
|
protected final Runnable pollScheduledTask() {
|
||||||
|
return pollScheduledTask(nanoTime());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}.
|
||||||
|
* You should use {@link #nanoTime()} to retrieve the the correct {@code nanoTime}.
|
||||||
|
*/
|
||||||
|
protected final Runnable pollScheduledTask(long nanoTime) {
|
||||||
|
assert inEventLoop();
|
||||||
|
|
||||||
|
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
|
||||||
|
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
|
||||||
|
if (scheduledTask == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (scheduledTask.deadlineNanos() <= nanoTime) {
|
||||||
|
scheduledTaskQueue.remove();
|
||||||
|
return scheduledTask;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the nanoseconds when the next scheduled task is ready to be run or {@code -1} if no task is scheduled.
|
||||||
|
*/
|
||||||
|
protected final long nextScheduledTaskNano() {
|
||||||
|
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
|
||||||
|
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
|
||||||
|
if (scheduledTask == null) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return Math.max(0, scheduledTask.deadlineNanos() - nanoTime());
|
||||||
|
}
|
||||||
|
|
||||||
|
final ScheduledFutureTask<?> peekScheduledTask() {
|
||||||
|
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
|
||||||
|
if (scheduledTaskQueue == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return scheduledTaskQueue.peek();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns {@code true} if a scheduled task is ready for processing.
|
||||||
|
*/
|
||||||
|
protected final boolean hasScheduledTasks() {
|
||||||
|
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
|
||||||
|
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
|
||||||
|
return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
|
||||||
|
ObjectUtil.checkNotNull(command, "command");
|
||||||
|
ObjectUtil.checkNotNull(unit, "unit");
|
||||||
|
if (delay < 0) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
String.format("delay: %d (expected: >= 0)", delay));
|
||||||
|
}
|
||||||
|
return schedule(new ScheduledFutureTask<Void>(
|
||||||
|
this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
|
||||||
|
ObjectUtil.checkNotNull(callable, "callable");
|
||||||
|
ObjectUtil.checkNotNull(unit, "unit");
|
||||||
|
if (delay < 0) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
String.format("delay: %d (expected: >= 0)", delay));
|
||||||
|
}
|
||||||
|
return schedule(new ScheduledFutureTask<V>(
|
||||||
|
this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
|
||||||
|
ObjectUtil.checkNotNull(command, "command");
|
||||||
|
ObjectUtil.checkNotNull(unit, "unit");
|
||||||
|
if (initialDelay < 0) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
|
||||||
|
}
|
||||||
|
if (period <= 0) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
String.format("period: %d (expected: > 0)", period));
|
||||||
|
}
|
||||||
|
|
||||||
|
return schedule(new ScheduledFutureTask<Void>(
|
||||||
|
this, Executors.<Void>callable(command, null),
|
||||||
|
ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
|
||||||
|
ObjectUtil.checkNotNull(command, "command");
|
||||||
|
ObjectUtil.checkNotNull(unit, "unit");
|
||||||
|
if (initialDelay < 0) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
|
||||||
|
}
|
||||||
|
if (delay <= 0) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
String.format("delay: %d (expected: > 0)", delay));
|
||||||
|
}
|
||||||
|
|
||||||
|
return schedule(new ScheduledFutureTask<Void>(
|
||||||
|
this, Executors.<Void>callable(command, null),
|
||||||
|
ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
|
||||||
|
}
|
||||||
|
|
||||||
|
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
|
||||||
|
if (inEventLoop()) {
|
||||||
|
scheduledTaskQueue().add(task);
|
||||||
|
} else {
|
||||||
|
execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
scheduledTaskQueue().add(task);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return task;
|
||||||
|
}
|
||||||
|
|
||||||
|
void purgeCancelledScheduledTasks() {
|
||||||
|
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
|
||||||
|
if (isNullOrEmpty(scheduledTaskQueue)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Iterator<ScheduledFutureTask<?>> i = scheduledTaskQueue.iterator();
|
||||||
|
while (i.hasNext()) {
|
||||||
|
ScheduledFutureTask<?> task = i.next();
|
||||||
|
if (task.isCancelled()) {
|
||||||
|
i.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -18,11 +18,8 @@ package io.netty.util.concurrent;
|
|||||||
import io.netty.util.internal.logging.InternalLogger;
|
import io.netty.util.internal.logging.InternalLogger;
|
||||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||||
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.PriorityQueue;
|
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
@ -35,7 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||||||
* task pending in the task queue for 1 second. Please note it is not scalable to schedule large number of tasks to
|
* task pending in the task queue for 1 second. Please note it is not scalable to schedule large number of tasks to
|
||||||
* this executor; use a dedicated executor.
|
* this executor; use a dedicated executor.
|
||||||
*/
|
*/
|
||||||
public final class GlobalEventExecutor extends AbstractEventExecutor {
|
public final class GlobalEventExecutor extends AbstractScheduledEventExecutor {
|
||||||
|
|
||||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class);
|
private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class);
|
||||||
|
|
||||||
@ -44,9 +41,8 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
|
|||||||
public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor();
|
public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor();
|
||||||
|
|
||||||
final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
|
final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
|
||||||
final Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
|
|
||||||
final ScheduledFutureTask<Void> purgeTask = new ScheduledFutureTask<Void>(
|
final ScheduledFutureTask<Void> purgeTask = new ScheduledFutureTask<Void>(
|
||||||
this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null),
|
this, Executors.<Void>callable(new PurgeTask(), null),
|
||||||
ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL);
|
ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL);
|
||||||
|
|
||||||
private final ThreadFactory threadFactory = new DefaultThreadFactory(getClass());
|
private final ThreadFactory threadFactory = new DefaultThreadFactory(getClass());
|
||||||
@ -57,7 +53,7 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
|
|||||||
private final Future<?> terminationFuture = new FailedFuture<Object>(this, new UnsupportedOperationException());
|
private final Future<?> terminationFuture = new FailedFuture<Object>(this, new UnsupportedOperationException());
|
||||||
|
|
||||||
private GlobalEventExecutor() {
|
private GlobalEventExecutor() {
|
||||||
delayedTaskQueue.add(purgeTask);
|
scheduledTaskQueue().add(purgeTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -73,8 +69,8 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
|
|||||||
Runnable takeTask() {
|
Runnable takeTask() {
|
||||||
BlockingQueue<Runnable> taskQueue = this.taskQueue;
|
BlockingQueue<Runnable> taskQueue = this.taskQueue;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
|
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
|
||||||
if (delayedTask == null) {
|
if (scheduledTask == null) {
|
||||||
Runnable task = null;
|
Runnable task = null;
|
||||||
try {
|
try {
|
||||||
task = taskQueue.take();
|
task = taskQueue.take();
|
||||||
@ -83,7 +79,7 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
|
|||||||
}
|
}
|
||||||
return task;
|
return task;
|
||||||
} else {
|
} else {
|
||||||
long delayNanos = delayedTask.delayNanos();
|
long delayNanos = scheduledTask.delayNanos();
|
||||||
Runnable task;
|
Runnable task;
|
||||||
if (delayNanos > 0) {
|
if (delayNanos > 0) {
|
||||||
try {
|
try {
|
||||||
@ -96,7 +92,7 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (task == null) {
|
if (task == null) {
|
||||||
fetchFromDelayedQueue();
|
fetchFromScheduledTaskQueue();
|
||||||
task = taskQueue.poll();
|
task = taskQueue.poll();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -107,23 +103,15 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void fetchFromDelayedQueue() {
|
private void fetchFromScheduledTaskQueue() {
|
||||||
long nanoTime = 0L;
|
if (hasScheduledTasks()) {
|
||||||
|
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
|
||||||
for (;;) {
|
for (;;) {
|
||||||
ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
|
Runnable scheduledTask = pollScheduledTask(nanoTime);
|
||||||
if (delayedTask == null) {
|
if (scheduledTask == null) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
taskQueue.add(scheduledTask);
|
||||||
if (nanoTime == 0L) {
|
|
||||||
nanoTime = ScheduledFutureTask.nanoTime();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (delayedTask.deadlineNanos() <= nanoTime) {
|
|
||||||
delayedTaskQueue.remove();
|
|
||||||
taskQueue.add(delayedTask);
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -223,103 +211,6 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ScheduledExecutorService implementation
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
|
|
||||||
if (command == null) {
|
|
||||||
throw new NullPointerException("command");
|
|
||||||
}
|
|
||||||
if (unit == null) {
|
|
||||||
throw new NullPointerException("unit");
|
|
||||||
}
|
|
||||||
if (delay < 0) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
String.format("delay: %d (expected: >= 0)", delay));
|
|
||||||
}
|
|
||||||
return schedule(new ScheduledFutureTask<Void>(
|
|
||||||
this, delayedTaskQueue, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
|
|
||||||
if (callable == null) {
|
|
||||||
throw new NullPointerException("callable");
|
|
||||||
}
|
|
||||||
if (unit == null) {
|
|
||||||
throw new NullPointerException("unit");
|
|
||||||
}
|
|
||||||
if (delay < 0) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
String.format("delay: %d (expected: >= 0)", delay));
|
|
||||||
}
|
|
||||||
return schedule(new ScheduledFutureTask<V>(
|
|
||||||
this, delayedTaskQueue, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
|
|
||||||
if (command == null) {
|
|
||||||
throw new NullPointerException("command");
|
|
||||||
}
|
|
||||||
if (unit == null) {
|
|
||||||
throw new NullPointerException("unit");
|
|
||||||
}
|
|
||||||
if (initialDelay < 0) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
|
|
||||||
}
|
|
||||||
if (period <= 0) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
String.format("period: %d (expected: > 0)", period));
|
|
||||||
}
|
|
||||||
|
|
||||||
return schedule(new ScheduledFutureTask<Void>(
|
|
||||||
this, delayedTaskQueue, Executors.<Void>callable(command, null),
|
|
||||||
ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
|
|
||||||
if (command == null) {
|
|
||||||
throw new NullPointerException("command");
|
|
||||||
}
|
|
||||||
if (unit == null) {
|
|
||||||
throw new NullPointerException("unit");
|
|
||||||
}
|
|
||||||
if (initialDelay < 0) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
|
|
||||||
}
|
|
||||||
if (delay <= 0) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
String.format("delay: %d (expected: > 0)", delay));
|
|
||||||
}
|
|
||||||
|
|
||||||
return schedule(new ScheduledFutureTask<Void>(
|
|
||||||
this, delayedTaskQueue, Executors.<Void>callable(command, null),
|
|
||||||
ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
|
|
||||||
}
|
|
||||||
|
|
||||||
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
|
|
||||||
if (task == null) {
|
|
||||||
throw new NullPointerException("task");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (inEventLoop()) {
|
|
||||||
delayedTaskQueue.add(task);
|
|
||||||
} else {
|
|
||||||
execute(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
delayedTaskQueue.add(task);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
return task;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void startThread() {
|
private void startThread() {
|
||||||
if (started.compareAndSet(false, true)) {
|
if (started.compareAndSet(false, true)) {
|
||||||
Thread t = threadFactory.newThread(taskRunner);
|
Thread t = threadFactory.newThread(taskRunner);
|
||||||
@ -345,8 +236,9 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue;
|
||||||
// Terminate if there is no task in the queue (except the purge task).
|
// Terminate if there is no task in the queue (except the purge task).
|
||||||
if (taskQueue.isEmpty() && delayedTaskQueue.size() == 1) {
|
if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
|
||||||
// Mark the current thread as stopped.
|
// Mark the current thread as stopped.
|
||||||
// The following CAS must always success and must be uncontended,
|
// The following CAS must always success and must be uncontended,
|
||||||
// because only one thread should be running at the same time.
|
// because only one thread should be running at the same time.
|
||||||
@ -354,7 +246,7 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
|
|||||||
assert stopped;
|
assert stopped;
|
||||||
|
|
||||||
// Check if there are pending entries added by execute() or schedule*() while we do CAS above.
|
// Check if there are pending entries added by execute() or schedule*() while we do CAS above.
|
||||||
if (taskQueue.isEmpty() && delayedTaskQueue.size() == 1) {
|
if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
|
||||||
// A) No new task was added and thus there's nothing to handle
|
// A) No new task was added and thus there's nothing to handle
|
||||||
// -> safe to terminate because there's nothing left to do
|
// -> safe to terminate because there's nothing left to do
|
||||||
// B) A new thread started and handled all the new tasks.
|
// B) A new thread started and handled all the new tasks.
|
||||||
@ -380,13 +272,7 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
|
|||||||
private final class PurgeTask implements Runnable {
|
private final class PurgeTask implements Runnable {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
Iterator<ScheduledFutureTask<?>> i = delayedTaskQueue.iterator();
|
purgeCancelledScheduledTasks();
|
||||||
while (i.hasNext()) {
|
|
||||||
ScheduledFutureTask<?> task = i.next();
|
|
||||||
if (task.isCancelled()) {
|
|
||||||
i.remove();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -36,37 +36,34 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
|
|||||||
}
|
}
|
||||||
|
|
||||||
private final long id = nextTaskId.getAndIncrement();
|
private final long id = nextTaskId.getAndIncrement();
|
||||||
private final Queue<ScheduledFutureTask<?>> delayedTaskQueue;
|
|
||||||
private long deadlineNanos;
|
private long deadlineNanos;
|
||||||
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
|
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
|
||||||
private final long periodNanos;
|
private final long periodNanos;
|
||||||
|
|
||||||
ScheduledFutureTask(
|
ScheduledFutureTask(
|
||||||
EventExecutor executor, Queue<ScheduledFutureTask<?>> delayedTaskQueue,
|
AbstractScheduledEventExecutor executor,
|
||||||
Runnable runnable, V result, long nanoTime) {
|
Runnable runnable, V result, long nanoTime) {
|
||||||
|
|
||||||
this(executor, delayedTaskQueue, toCallable(runnable, result), nanoTime);
|
this(executor, toCallable(runnable, result), nanoTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
ScheduledFutureTask(
|
ScheduledFutureTask(
|
||||||
EventExecutor executor, Queue<ScheduledFutureTask<?>> delayedTaskQueue,
|
AbstractScheduledEventExecutor executor,
|
||||||
Callable<V> callable, long nanoTime, long period) {
|
Callable<V> callable, long nanoTime, long period) {
|
||||||
|
|
||||||
super(executor, callable);
|
super(executor, callable);
|
||||||
if (period == 0) {
|
if (period == 0) {
|
||||||
throw new IllegalArgumentException("period: 0 (expected: != 0)");
|
throw new IllegalArgumentException("period: 0 (expected: != 0)");
|
||||||
}
|
}
|
||||||
this.delayedTaskQueue = delayedTaskQueue;
|
|
||||||
deadlineNanos = nanoTime;
|
deadlineNanos = nanoTime;
|
||||||
periodNanos = period;
|
periodNanos = period;
|
||||||
}
|
}
|
||||||
|
|
||||||
ScheduledFutureTask(
|
ScheduledFutureTask(
|
||||||
EventExecutor executor, Queue<ScheduledFutureTask<?>> delayedTaskQueue,
|
AbstractScheduledEventExecutor executor,
|
||||||
Callable<V> callable, long nanoTime) {
|
Callable<V> callable, long nanoTime) {
|
||||||
|
|
||||||
super(executor, callable);
|
super(executor, callable);
|
||||||
this.delayedTaskQueue = delayedTaskQueue;
|
|
||||||
deadlineNanos = nanoTime;
|
deadlineNanos = nanoTime;
|
||||||
periodNanos = 0;
|
periodNanos = 0;
|
||||||
}
|
}
|
||||||
@ -135,7 +132,11 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
|
|||||||
deadlineNanos = nanoTime() - p;
|
deadlineNanos = nanoTime() - p;
|
||||||
}
|
}
|
||||||
if (!isCancelled()) {
|
if (!isCancelled()) {
|
||||||
delayedTaskQueue.add(this);
|
// scheduledTaskQueue can never be null as we lazy init it before submit the task!
|
||||||
|
Queue<ScheduledFutureTask<?>> scheduledTaskQueue =
|
||||||
|
((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;
|
||||||
|
assert scheduledTaskQueue != null;
|
||||||
|
scheduledTaskQueue.add(this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -20,14 +20,11 @@ import io.netty.util.internal.logging.InternalLogger;
|
|||||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.LinkedHashSet;
|
import java.util.LinkedHashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.PriorityQueue;
|
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
@ -40,7 +37,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
|||||||
* Abstract base class for {@link EventExecutor}'s that execute all its submitted tasks in a single thread.
|
* Abstract base class for {@link EventExecutor}'s that execute all its submitted tasks in a single thread.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor {
|
||||||
|
|
||||||
private static final InternalLogger logger =
|
private static final InternalLogger logger =
|
||||||
InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
|
InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
|
||||||
@ -71,8 +68,6 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
|
|
||||||
private final EventExecutorGroup parent;
|
private final EventExecutorGroup parent;
|
||||||
private final Queue<Runnable> taskQueue;
|
private final Queue<Runnable> taskQueue;
|
||||||
final Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
|
|
||||||
|
|
||||||
private final Thread thread;
|
private final Thread thread;
|
||||||
private final Semaphore threadLock = new Semaphore(0);
|
private final Semaphore threadLock = new Semaphore(0);
|
||||||
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
|
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
|
||||||
@ -215,8 +210,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
|
|
||||||
BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
|
BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
|
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
|
||||||
if (delayedTask == null) {
|
if (scheduledTask == null) {
|
||||||
Runnable task = null;
|
Runnable task = null;
|
||||||
try {
|
try {
|
||||||
task = taskQueue.take();
|
task = taskQueue.take();
|
||||||
@ -228,7 +223,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
}
|
}
|
||||||
return task;
|
return task;
|
||||||
} else {
|
} else {
|
||||||
long delayNanos = delayedTask.delayNanos();
|
long delayNanos = scheduledTask.delayNanos();
|
||||||
Runnable task = null;
|
Runnable task = null;
|
||||||
if (delayNanos > 0) {
|
if (delayNanos > 0) {
|
||||||
try {
|
try {
|
||||||
@ -238,11 +233,11 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (task == null) {
|
if (task == null) {
|
||||||
// We need to fetch the delayed tasks now as otherwise there may be a chance that
|
// We need to fetch the scheduled tasks now as otherwise there may be a chance that
|
||||||
// delayed tasks are never executed if there is always one task in the taskQueue.
|
// scheduled tasks are never executed if there is always one task in the taskQueue.
|
||||||
// This is for example true for the read task of OIO Transport
|
// This is for example true for the read task of OIO Transport
|
||||||
// See https://github.com/netty/netty/issues/1614
|
// See https://github.com/netty/netty/issues/1614
|
||||||
fetchFromDelayedQueue();
|
fetchFromScheduledTaskQueue();
|
||||||
task = taskQueue.poll();
|
task = taskQueue.poll();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -253,23 +248,15 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void fetchFromDelayedQueue() {
|
private void fetchFromScheduledTaskQueue() {
|
||||||
long nanoTime = 0L;
|
if (hasScheduledTasks()) {
|
||||||
|
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
|
||||||
for (;;) {
|
for (;;) {
|
||||||
ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
|
Runnable scheduledTask = pollScheduledTask(nanoTime);
|
||||||
if (delayedTask == null) {
|
if (scheduledTask == null) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
taskQueue.add(scheduledTask);
|
||||||
if (nanoTime == 0L) {
|
|
||||||
nanoTime = ScheduledFutureTask.nanoTime();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (delayedTask.deadlineNanos() <= nanoTime) {
|
|
||||||
delayedTaskQueue.remove();
|
|
||||||
taskQueue.add(delayedTask);
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -290,16 +277,6 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
return !taskQueue.isEmpty();
|
return !taskQueue.isEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns {@code true} if a scheduled task is ready for processing by {@link #runAllTasks()} or
|
|
||||||
* {@link #runAllTasks(long)}.
|
|
||||||
*/
|
|
||||||
protected boolean hasScheduledTasks() {
|
|
||||||
assert inEventLoop();
|
|
||||||
ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
|
|
||||||
return delayedTask != null && delayedTask.deadlineNanos() <= ScheduledFutureTask.nanoTime();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the number of tasks that are pending for processing.
|
* Return the number of tasks that are pending for processing.
|
||||||
*
|
*
|
||||||
@ -340,7 +317,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
* @return {@code true} if and only if at least one task was run
|
* @return {@code true} if and only if at least one task was run
|
||||||
*/
|
*/
|
||||||
protected boolean runAllTasks() {
|
protected boolean runAllTasks() {
|
||||||
fetchFromDelayedQueue();
|
fetchFromScheduledTaskQueue();
|
||||||
Runnable task = pollTask();
|
Runnable task = pollTask();
|
||||||
if (task == null) {
|
if (task == null) {
|
||||||
return false;
|
return false;
|
||||||
@ -366,7 +343,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
* the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
|
* the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
|
||||||
*/
|
*/
|
||||||
protected boolean runAllTasks(long timeoutNanos) {
|
protected boolean runAllTasks(long timeoutNanos) {
|
||||||
fetchFromDelayedQueue();
|
fetchFromScheduledTaskQueue();
|
||||||
Runnable task = pollTask();
|
Runnable task = pollTask();
|
||||||
if (task == null) {
|
if (task == null) {
|
||||||
return false;
|
return false;
|
||||||
@ -408,12 +385,12 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
* Returns the amount of time left until the scheduled task with the closest dead line is executed.
|
* Returns the amount of time left until the scheduled task with the closest dead line is executed.
|
||||||
*/
|
*/
|
||||||
protected long delayNanos(long currentTimeNanos) {
|
protected long delayNanos(long currentTimeNanos) {
|
||||||
ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
|
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
|
||||||
if (delayedTask == null) {
|
if (scheduledTask == null) {
|
||||||
return SCHEDULE_PURGE_INTERVAL;
|
return SCHEDULE_PURGE_INTERVAL;
|
||||||
}
|
}
|
||||||
|
|
||||||
return delayedTask.delayNanos(currentTimeNanos);
|
return scheduledTask.delayNanos(currentTimeNanos);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -641,7 +618,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
throw new IllegalStateException("must be invoked from an event loop");
|
throw new IllegalStateException("must be invoked from an event loop");
|
||||||
}
|
}
|
||||||
|
|
||||||
cancelDelayedTasks();
|
cancelScheduledTasks();
|
||||||
|
|
||||||
if (gracefulShutdownStartTime == 0) {
|
if (gracefulShutdownStartTime == 0) {
|
||||||
gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
|
gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
|
||||||
@ -682,21 +659,6 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void cancelDelayedTasks() {
|
|
||||||
if (delayedTaskQueue.isEmpty()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
final ScheduledFutureTask<?>[] delayedTasks =
|
|
||||||
delayedTaskQueue.toArray(new ScheduledFutureTask<?>[delayedTaskQueue.size()]);
|
|
||||||
|
|
||||||
for (ScheduledFutureTask<?> task: delayedTasks) {
|
|
||||||
task.cancel(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
delayedTaskQueue.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
|
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
|
||||||
if (unit == null) {
|
if (unit == null) {
|
||||||
@ -749,106 +711,11 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
|
|
||||||
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
|
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
|
||||||
|
|
||||||
@Override
|
|
||||||
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
|
|
||||||
if (command == null) {
|
|
||||||
throw new NullPointerException("command");
|
|
||||||
}
|
|
||||||
if (unit == null) {
|
|
||||||
throw new NullPointerException("unit");
|
|
||||||
}
|
|
||||||
if (delay < 0) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
String.format("delay: %d (expected: >= 0)", delay));
|
|
||||||
}
|
|
||||||
return schedule(new ScheduledFutureTask<Void>(
|
|
||||||
this, delayedTaskQueue, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
|
|
||||||
if (callable == null) {
|
|
||||||
throw new NullPointerException("callable");
|
|
||||||
}
|
|
||||||
if (unit == null) {
|
|
||||||
throw new NullPointerException("unit");
|
|
||||||
}
|
|
||||||
if (delay < 0) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
String.format("delay: %d (expected: >= 0)", delay));
|
|
||||||
}
|
|
||||||
return schedule(new ScheduledFutureTask<V>(
|
|
||||||
this, delayedTaskQueue, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
|
|
||||||
if (command == null) {
|
|
||||||
throw new NullPointerException("command");
|
|
||||||
}
|
|
||||||
if (unit == null) {
|
|
||||||
throw new NullPointerException("unit");
|
|
||||||
}
|
|
||||||
if (initialDelay < 0) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
|
|
||||||
}
|
|
||||||
if (period <= 0) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
String.format("period: %d (expected: > 0)", period));
|
|
||||||
}
|
|
||||||
|
|
||||||
return schedule(new ScheduledFutureTask<Void>(
|
|
||||||
this, delayedTaskQueue, Executors.<Void>callable(command, null),
|
|
||||||
ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
|
|
||||||
if (command == null) {
|
|
||||||
throw new NullPointerException("command");
|
|
||||||
}
|
|
||||||
if (unit == null) {
|
|
||||||
throw new NullPointerException("unit");
|
|
||||||
}
|
|
||||||
if (initialDelay < 0) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
|
|
||||||
}
|
|
||||||
if (delay <= 0) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
String.format("delay: %d (expected: > 0)", delay));
|
|
||||||
}
|
|
||||||
|
|
||||||
return schedule(new ScheduledFutureTask<Void>(
|
|
||||||
this, delayedTaskQueue, Executors.<Void>callable(command, null),
|
|
||||||
ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
|
|
||||||
}
|
|
||||||
|
|
||||||
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
|
|
||||||
if (task == null) {
|
|
||||||
throw new NullPointerException("task");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (inEventLoop()) {
|
|
||||||
delayedTaskQueue.add(task);
|
|
||||||
} else {
|
|
||||||
execute(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
delayedTaskQueue.add(task);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
return task;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void startThread() {
|
private void startThread() {
|
||||||
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
|
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
|
||||||
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
|
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
|
||||||
delayedTaskQueue.add(new ScheduledFutureTask<Void>(
|
schedule(new ScheduledFutureTask<Void>(
|
||||||
this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null),
|
this, Executors.<Void>callable(new PurgeTask(), null),
|
||||||
ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
|
ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
|
||||||
thread.start();
|
thread.start();
|
||||||
}
|
}
|
||||||
@ -858,13 +725,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
private final class PurgeTask implements Runnable {
|
private final class PurgeTask implements Runnable {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
Iterator<ScheduledFutureTask<?>> i = delayedTaskQueue.iterator();
|
purgeCancelledScheduledTasks();
|
||||||
while (i.hasNext()) {
|
|
||||||
ScheduledFutureTask<?> task = i.next();
|
|
||||||
if (task.isCancelled()) {
|
|
||||||
i.remove();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -217,18 +217,23 @@ public class EmbeddedChannel extends AbstractChannel {
|
|||||||
/**
|
/**
|
||||||
* Mark this {@link Channel} as finished. Any futher try to write data to it will fail.
|
* Mark this {@link Channel} as finished. Any futher try to write data to it will fail.
|
||||||
*
|
*
|
||||||
*
|
|
||||||
* @return bufferReadable returns {@code true} if any of the used buffers has something left to read
|
* @return bufferReadable returns {@code true} if any of the used buffers has something left to read
|
||||||
*/
|
*/
|
||||||
public boolean finish() {
|
public boolean finish() {
|
||||||
close();
|
close();
|
||||||
runPendingTasks();
|
runPendingTasks();
|
||||||
|
|
||||||
|
// Cancel all scheduled tasks that are left.
|
||||||
|
loop.cancelScheduledTasks();
|
||||||
|
|
||||||
checkException();
|
checkException();
|
||||||
|
|
||||||
return !inboundMessages.isEmpty() || !outboundMessages.isEmpty();
|
return !inboundMessages.isEmpty() || !outboundMessages.isEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Run all tasks that are pending in the {@link EventLoop} for this {@link Channel}
|
* Run all tasks (which also includes scheduled tasks) that are pending in the {@link EventLoop}
|
||||||
|
* for this {@link Channel}
|
||||||
*/
|
*/
|
||||||
public void runPendingTasks() {
|
public void runPendingTasks() {
|
||||||
try {
|
try {
|
||||||
@ -236,6 +241,26 @@ public class EmbeddedChannel extends AbstractChannel {
|
|||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
recordException(e);
|
recordException(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
loop.runScheduledTasks();
|
||||||
|
} catch (Exception e) {
|
||||||
|
recordException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run all pending scheduled tasks in the {@link EventLoop} for this {@link Channel} and return the
|
||||||
|
* {@code nanoseconds} when the next scheduled task is ready to run. If no other task was scheduled it will return
|
||||||
|
* {@code -1}.
|
||||||
|
*/
|
||||||
|
public long runScheduledPendingTasks() {
|
||||||
|
try {
|
||||||
|
return loop.runScheduledTasks();
|
||||||
|
} catch (Exception e) {
|
||||||
|
recordException(e);
|
||||||
|
return loop.nextScheduledTask();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void recordException(Throwable cause) {
|
private void recordException(Throwable cause) {
|
||||||
|
@ -21,14 +21,14 @@ import io.netty.channel.ChannelPromise;
|
|||||||
import io.netty.channel.DefaultChannelPromise;
|
import io.netty.channel.DefaultChannelPromise;
|
||||||
import io.netty.channel.EventLoop;
|
import io.netty.channel.EventLoop;
|
||||||
import io.netty.channel.EventLoopGroup;
|
import io.netty.channel.EventLoopGroup;
|
||||||
import io.netty.util.concurrent.AbstractEventExecutor;
|
import io.netty.util.concurrent.AbstractScheduledEventExecutor;
|
||||||
import io.netty.util.concurrent.Future;
|
import io.netty.util.concurrent.Future;
|
||||||
|
|
||||||
import java.util.ArrayDeque;
|
import java.util.ArrayDeque;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
final class EmbeddedEventLoop extends AbstractEventExecutor implements EventLoop {
|
final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements EventLoop {
|
||||||
|
|
||||||
private final Queue<Runnable> tasks = new ArrayDeque<Runnable>(2);
|
private final Queue<Runnable> tasks = new ArrayDeque<Runnable>(2);
|
||||||
|
|
||||||
@ -51,6 +51,27 @@ final class EmbeddedEventLoop extends AbstractEventExecutor implements EventLoop
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long runScheduledTasks() {
|
||||||
|
long time = AbstractScheduledEventExecutor.nanoTime();
|
||||||
|
for (;;) {
|
||||||
|
Runnable task = pollScheduledTask(time);
|
||||||
|
if (task == null) {
|
||||||
|
return nextScheduledTaskNano();
|
||||||
|
}
|
||||||
|
|
||||||
|
task.run();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
long nextScheduledTask() {
|
||||||
|
return nextScheduledTaskNano();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void cancelScheduledTasks() {
|
||||||
|
super.cancelScheduledTasks();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
|
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
|
@ -21,9 +21,15 @@ import io.netty.channel.ChannelHandlerContext;
|
|||||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||||
import io.netty.channel.ChannelInitializer;
|
import io.netty.channel.ChannelInitializer;
|
||||||
import io.netty.channel.ChannelPipeline;
|
import io.netty.channel.ChannelPipeline;
|
||||||
|
import io.netty.util.concurrent.Future;
|
||||||
|
import io.netty.util.concurrent.FutureListener;
|
||||||
|
import io.netty.util.concurrent.ScheduledFuture;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public class EmbeddedChannelTest {
|
public class EmbeddedChannelTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -52,4 +58,41 @@ public class EmbeddedChannelTest {
|
|||||||
Assert.assertSame(second, channel.readInbound());
|
Assert.assertSame(second, channel.readInbound());
|
||||||
Assert.assertNull(channel.readInbound());
|
Assert.assertNull(channel.readInbound());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||||
|
@Test
|
||||||
|
public void testScheduling() throws Exception {
|
||||||
|
EmbeddedChannel ch = new EmbeddedChannel(new ChannelInboundHandlerAdapter());
|
||||||
|
final CountDownLatch latch = new CountDownLatch(2);
|
||||||
|
ScheduledFuture future = ch.eventLoop().schedule(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
}, 1, TimeUnit.SECONDS);
|
||||||
|
future.addListener(new FutureListener() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(Future future) throws Exception {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
long next = ch.runScheduledPendingTasks();
|
||||||
|
Assert.assertTrue(next > 0);
|
||||||
|
// Sleep for the nanoseconds but also give extra 50ms as the clock my not be very precise and so fail the test
|
||||||
|
// otherwise.
|
||||||
|
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(next) + 50);
|
||||||
|
Assert.assertEquals(-1, ch.runScheduledPendingTasks());
|
||||||
|
latch.await();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScheduledCancelled() throws Exception {
|
||||||
|
EmbeddedChannel ch = new EmbeddedChannel(new ChannelInboundHandlerAdapter());
|
||||||
|
ScheduledFuture<?> future = ch.eventLoop().schedule(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() { }
|
||||||
|
}, 1, TimeUnit.DAYS);
|
||||||
|
ch.finish();
|
||||||
|
Assert.assertTrue(future.isCancelled());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user