Add GlobalEventExecutor
- Related issue: #1389 - Also extracted SingleThreadEventExecutor.ScheduledFutureTask into a top level class for a reuse
This commit is contained in:
parent
786501d972
commit
1749210985
@ -34,6 +34,11 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean inEventLoop() {
|
||||||
|
return inEventLoop(Thread.currentThread());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Iterator<EventExecutor> iterator() {
|
public Iterator<EventExecutor> iterator() {
|
||||||
return new EventExecutorIterator();
|
return new EventExecutorIterator();
|
||||||
|
@ -0,0 +1,356 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2012 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.util.concurrent;
|
||||||
|
|
||||||
|
import io.netty.util.internal.logging.InternalLogger;
|
||||||
|
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||||
|
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.PriorityQueue;
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Single-thread singleton {@link EventExecutor}. It starts the thread automatically and stops it when there is no
|
||||||
|
* task pending in the task queue for 1 second. Please note it is not scalable to schedule large number of tasks to
|
||||||
|
* this executor; use a dedicated executor.
|
||||||
|
*/
|
||||||
|
public final class GlobalEventExecutor extends AbstractEventExecutor {
|
||||||
|
|
||||||
|
private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class);
|
||||||
|
|
||||||
|
private static final int ST_NOT_STARTED = 1;
|
||||||
|
private static final int ST_STARTED = 2;
|
||||||
|
|
||||||
|
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
|
||||||
|
|
||||||
|
public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor();
|
||||||
|
|
||||||
|
final Queue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
|
||||||
|
final Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
|
||||||
|
final ScheduledFutureTask<Void> purgeTask = new ScheduledFutureTask<Void>(
|
||||||
|
this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null),
|
||||||
|
ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL);
|
||||||
|
|
||||||
|
private final ThreadFactory threadFactory = new DefaultThreadFactory(getClass());
|
||||||
|
private final TaskRunner taskRunner = new TaskRunner();
|
||||||
|
private final Object stateLock = new Object();
|
||||||
|
|
||||||
|
volatile Thread thread;
|
||||||
|
private volatile int state = ST_NOT_STARTED;
|
||||||
|
|
||||||
|
private GlobalEventExecutor() {
|
||||||
|
delayedTaskQueue.add(purgeTask);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EventExecutorGroup parent() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
|
||||||
|
*
|
||||||
|
* @return {@code null} if the executor thread has been interrupted or waken up.
|
||||||
|
*/
|
||||||
|
Runnable takeTask() {
|
||||||
|
BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
|
||||||
|
for (;;) {
|
||||||
|
ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
|
||||||
|
if (delayedTask == null) {
|
||||||
|
Runnable task = null;
|
||||||
|
try {
|
||||||
|
task = taskQueue.take();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// Ignore
|
||||||
|
}
|
||||||
|
return task;
|
||||||
|
} else {
|
||||||
|
long delayNanos = delayedTask.delayNanos();
|
||||||
|
Runnable task;
|
||||||
|
if (delayNanos > 0) {
|
||||||
|
try {
|
||||||
|
task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
task = taskQueue.poll();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (task == null) {
|
||||||
|
fetchFromDelayedQueue();
|
||||||
|
task = taskQueue.poll();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (task != null) {
|
||||||
|
return task;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void fetchFromDelayedQueue() {
|
||||||
|
long nanoTime = 0L;
|
||||||
|
for (;;) {
|
||||||
|
ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
|
||||||
|
if (delayedTask == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (nanoTime == 0L) {
|
||||||
|
nanoTime = ScheduledFutureTask.nanoTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (delayedTask.deadlineNanos() <= nanoTime) {
|
||||||
|
delayedTaskQueue.remove();
|
||||||
|
taskQueue.add(delayedTask);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the number of tasks that are pending for processing.
|
||||||
|
*
|
||||||
|
* <strong>Be aware that this operation may be expensive as it depends on the internal implementation of the
|
||||||
|
* SingleThreadEventExecutor. So use it was care!</strong>
|
||||||
|
*/
|
||||||
|
public int pendingTasks() {
|
||||||
|
return taskQueue.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
|
||||||
|
* before.
|
||||||
|
*/
|
||||||
|
private void addTask(Runnable task) {
|
||||||
|
if (task == null) {
|
||||||
|
throw new NullPointerException("task");
|
||||||
|
}
|
||||||
|
taskQueue.add(task);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean inEventLoop(Thread thread) {
|
||||||
|
return thread == this.thread;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Deprecated
|
||||||
|
public void shutdown() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isShuttingDown() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isShutdown() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isTerminated() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean awaitTermination(long timeout, TimeUnit unit) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void execute(Runnable task) {
|
||||||
|
if (task == null) {
|
||||||
|
throw new NullPointerException("task");
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean inEventLoop = inEventLoop();
|
||||||
|
if (inEventLoop) {
|
||||||
|
addTask(task);
|
||||||
|
} else {
|
||||||
|
startThread();
|
||||||
|
addTask(task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ScheduledExecutorService implementation
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
|
||||||
|
if (command == null) {
|
||||||
|
throw new NullPointerException("command");
|
||||||
|
}
|
||||||
|
if (unit == null) {
|
||||||
|
throw new NullPointerException("unit");
|
||||||
|
}
|
||||||
|
if (delay < 0) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
String.format("delay: %d (expected: >= 0)", delay));
|
||||||
|
}
|
||||||
|
return schedule(new ScheduledFutureTask<Void>(
|
||||||
|
this, delayedTaskQueue, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
|
||||||
|
if (callable == null) {
|
||||||
|
throw new NullPointerException("callable");
|
||||||
|
}
|
||||||
|
if (unit == null) {
|
||||||
|
throw new NullPointerException("unit");
|
||||||
|
}
|
||||||
|
if (delay < 0) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
String.format("delay: %d (expected: >= 0)", delay));
|
||||||
|
}
|
||||||
|
return schedule(new ScheduledFutureTask<V>(
|
||||||
|
this, delayedTaskQueue, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
|
||||||
|
if (command == null) {
|
||||||
|
throw new NullPointerException("command");
|
||||||
|
}
|
||||||
|
if (unit == null) {
|
||||||
|
throw new NullPointerException("unit");
|
||||||
|
}
|
||||||
|
if (initialDelay < 0) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
|
||||||
|
}
|
||||||
|
if (period <= 0) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
String.format("period: %d (expected: > 0)", period));
|
||||||
|
}
|
||||||
|
|
||||||
|
return schedule(new ScheduledFutureTask<Void>(
|
||||||
|
this, delayedTaskQueue, Executors.<Void>callable(command, null),
|
||||||
|
ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
|
||||||
|
if (command == null) {
|
||||||
|
throw new NullPointerException("command");
|
||||||
|
}
|
||||||
|
if (unit == null) {
|
||||||
|
throw new NullPointerException("unit");
|
||||||
|
}
|
||||||
|
if (initialDelay < 0) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
|
||||||
|
}
|
||||||
|
if (delay <= 0) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
String.format("delay: %d (expected: > 0)", delay));
|
||||||
|
}
|
||||||
|
|
||||||
|
return schedule(new ScheduledFutureTask<Void>(
|
||||||
|
this, delayedTaskQueue, Executors.<Void>callable(command, null),
|
||||||
|
ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
|
||||||
|
if (task == null) {
|
||||||
|
throw new NullPointerException("task");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (inEventLoop()) {
|
||||||
|
delayedTaskQueue.add(task);
|
||||||
|
} else {
|
||||||
|
execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
delayedTaskQueue.add(task);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return task;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startThread() {
|
||||||
|
synchronized (stateLock) {
|
||||||
|
if (state == ST_NOT_STARTED) {
|
||||||
|
state = ST_STARTED;
|
||||||
|
|
||||||
|
thread = threadFactory.newThread(taskRunner);
|
||||||
|
|
||||||
|
thread.start();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final class TaskRunner implements Runnable {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
for (;;) {
|
||||||
|
Runnable task = takeTask();
|
||||||
|
if (task != null) {
|
||||||
|
try {
|
||||||
|
task.run();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
logger.warn("Unexpected exception from the global event executor: ", t);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (task != purgeTask) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taskQueue.isEmpty() && delayedTaskQueue.size() == 1) {
|
||||||
|
synchronized (stateLock) {
|
||||||
|
// Terminate if there is no task in the queue (except the purge task).
|
||||||
|
if (taskQueue.isEmpty() && delayedTaskQueue.size() == 1) {
|
||||||
|
state = ST_NOT_STARTED;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final class PurgeTask implements Runnable {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
Iterator<ScheduledFutureTask<?>> i = delayedTaskQueue.iterator();
|
||||||
|
while (i.hasNext()) {
|
||||||
|
ScheduledFutureTask<?> task = i.next();
|
||||||
|
if (task.isCancelled()) {
|
||||||
|
i.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,148 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2013 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.netty.util.concurrent;
|
||||||
|
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.Delayed;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
@SuppressWarnings("ComparableImplementedButEqualsNotOverridden")
|
||||||
|
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
|
||||||
|
private static final AtomicLong nextTaskId = new AtomicLong();
|
||||||
|
private static final long START_TIME = System.nanoTime();
|
||||||
|
|
||||||
|
static long nanoTime() {
|
||||||
|
return System.nanoTime() - START_TIME;
|
||||||
|
}
|
||||||
|
|
||||||
|
static long deadlineNanos(long delay) {
|
||||||
|
return nanoTime() + delay;
|
||||||
|
}
|
||||||
|
|
||||||
|
private final long id = nextTaskId.getAndIncrement();
|
||||||
|
private final Queue<ScheduledFutureTask<?>> delayedTaskQueue;
|
||||||
|
private long deadlineNanos;
|
||||||
|
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
|
||||||
|
private final long periodNanos;
|
||||||
|
|
||||||
|
ScheduledFutureTask(
|
||||||
|
EventExecutor executor, Queue<ScheduledFutureTask<?>> delayedTaskQueue,
|
||||||
|
Runnable runnable, V result, long nanoTime) {
|
||||||
|
|
||||||
|
this(executor, delayedTaskQueue, Executors.callable(runnable, result), nanoTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
ScheduledFutureTask(
|
||||||
|
EventExecutor executor, Queue<ScheduledFutureTask<?>> delayedTaskQueue,
|
||||||
|
Callable<V> callable, long nanoTime, long period) {
|
||||||
|
|
||||||
|
super(executor, callable);
|
||||||
|
if (period == 0) {
|
||||||
|
throw new IllegalArgumentException("period: 0 (expected: != 0)");
|
||||||
|
}
|
||||||
|
this.delayedTaskQueue = delayedTaskQueue;
|
||||||
|
deadlineNanos = nanoTime;
|
||||||
|
periodNanos = period;
|
||||||
|
}
|
||||||
|
|
||||||
|
ScheduledFutureTask(
|
||||||
|
EventExecutor executor, Queue<ScheduledFutureTask<?>> delayedTaskQueue,
|
||||||
|
Callable<V> callable, long nanoTime) {
|
||||||
|
|
||||||
|
super(executor, callable);
|
||||||
|
this.delayedTaskQueue = delayedTaskQueue;
|
||||||
|
deadlineNanos = nanoTime;
|
||||||
|
periodNanos = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected EventExecutor executor() {
|
||||||
|
return super.executor();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long deadlineNanos() {
|
||||||
|
return deadlineNanos;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long delayNanos() {
|
||||||
|
return Math.max(0, deadlineNanos() - nanoTime());
|
||||||
|
}
|
||||||
|
|
||||||
|
public long delayNanos(long currentTimeNanos) {
|
||||||
|
return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getDelay(TimeUnit unit) {
|
||||||
|
return unit.convert(delayNanos(), TimeUnit.NANOSECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareTo(Delayed o) {
|
||||||
|
if (this == o) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
|
||||||
|
long d = deadlineNanos() - that.deadlineNanos();
|
||||||
|
if (d < 0) {
|
||||||
|
return -1;
|
||||||
|
} else if (d > 0) {
|
||||||
|
return 1;
|
||||||
|
} else if (id < that.id) {
|
||||||
|
return -1;
|
||||||
|
} else if (id == that.id) {
|
||||||
|
throw new Error();
|
||||||
|
} else {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
assert executor().inEventLoop();
|
||||||
|
try {
|
||||||
|
if (periodNanos == 0) {
|
||||||
|
if (setUncancellableInternal()) {
|
||||||
|
V result = task.call();
|
||||||
|
setSuccessInternal(result);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// check if is done as it may was cancelled
|
||||||
|
if (!isCancelled()) {
|
||||||
|
task.call();
|
||||||
|
if (!executor().isShutdown()) {
|
||||||
|
long p = periodNanos;
|
||||||
|
if (p > 0) {
|
||||||
|
deadlineNanos += p;
|
||||||
|
} else {
|
||||||
|
deadlineNanos = nanoTime() - p;
|
||||||
|
}
|
||||||
|
if (!isCancelled()) {
|
||||||
|
delayedTaskQueue.add(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Throwable cause) {
|
||||||
|
setFailureInternal(cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -27,14 +27,12 @@ import java.util.Queue;
|
|||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.Delayed;
|
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Abstract base class for {@link EventExecutor}'s that execute all its submitted tasks in a single thread.
|
* Abstract base class for {@link EventExecutor}'s that execute all its submitted tasks in a single thread.
|
||||||
@ -243,7 +241,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (nanoTime == 0L) {
|
if (nanoTime == 0L) {
|
||||||
nanoTime = nanoTime();
|
nanoTime = ScheduledFutureTask.nanoTime();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (delayedTask.deadlineNanos() <= nanoTime) {
|
if (delayedTask.deadlineNanos() <= nanoTime) {
|
||||||
@ -326,7 +324,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
|
|
||||||
task = pollTask();
|
task = pollTask();
|
||||||
if (task == null) {
|
if (task == null) {
|
||||||
lastExecutionTime = nanoTime();
|
lastExecutionTime = ScheduledFutureTask.nanoTime();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -343,7 +341,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
final long deadline = nanoTime() + timeoutNanos;
|
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
|
||||||
long runTasks = 0;
|
long runTasks = 0;
|
||||||
long lastExecutionTime;
|
long lastExecutionTime;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
@ -358,7 +356,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
|
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
|
||||||
// XXX: Hard-coded value - will make it configurable if it is really a problem.
|
// XXX: Hard-coded value - will make it configurable if it is really a problem.
|
||||||
if ((runTasks & 0x3F) == 0) {
|
if ((runTasks & 0x3F) == 0) {
|
||||||
lastExecutionTime = nanoTime();
|
lastExecutionTime = ScheduledFutureTask.nanoTime();
|
||||||
if (lastExecutionTime >= deadline) {
|
if (lastExecutionTime >= deadline) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -366,7 +364,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
|
|
||||||
task = pollTask();
|
task = pollTask();
|
||||||
if (task == null) {
|
if (task == null) {
|
||||||
lastExecutionTime = nanoTime();
|
lastExecutionTime = ScheduledFutureTask.nanoTime();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -395,7 +393,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
* checks.
|
* checks.
|
||||||
*/
|
*/
|
||||||
protected void updateLastExecutionTime() {
|
protected void updateLastExecutionTime() {
|
||||||
lastExecutionTime = nanoTime();
|
lastExecutionTime = ScheduledFutureTask.nanoTime();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -416,11 +414,6 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean inEventLoop() {
|
|
||||||
return inEventLoop(Thread.currentThread());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean inEventLoop(Thread thread) {
|
public boolean inEventLoop(Thread thread) {
|
||||||
return thread == this.thread;
|
return thread == this.thread;
|
||||||
@ -476,7 +469,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (ran) {
|
if (ran) {
|
||||||
lastExecutionTime = nanoTime();
|
lastExecutionTime = ScheduledFutureTask.nanoTime();
|
||||||
}
|
}
|
||||||
|
|
||||||
return ran;
|
return ran;
|
||||||
@ -602,7 +595,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
cancelDelayedTasks();
|
cancelDelayedTasks();
|
||||||
|
|
||||||
if (gracefulShutdownStartTime == 0) {
|
if (gracefulShutdownStartTime == 0) {
|
||||||
gracefulShutdownStartTime = nanoTime();
|
gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (runAllTasks() || runShutdownHooks()) {
|
if (runAllTasks() || runShutdownHooks()) {
|
||||||
@ -616,7 +609,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
final long nanoTime = nanoTime();
|
final long nanoTime = ScheduledFutureTask.nanoTime();
|
||||||
|
|
||||||
if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
|
if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
|
||||||
return true;
|
return true;
|
||||||
@ -701,15 +694,6 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
// ScheduledExecutorService implementation
|
// ScheduledExecutorService implementation
|
||||||
|
|
||||||
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
|
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
|
||||||
private static final long START_TIME = System.nanoTime();
|
|
||||||
private static final AtomicLong nextTaskId = new AtomicLong();
|
|
||||||
private static long nanoTime() {
|
|
||||||
return System.nanoTime() - START_TIME;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static long deadlineNanos(long delay) {
|
|
||||||
return nanoTime() + delay;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
|
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
|
||||||
@ -723,7 +707,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
String.format("delay: %d (expected: >= 0)", delay));
|
String.format("delay: %d (expected: >= 0)", delay));
|
||||||
}
|
}
|
||||||
return schedule(new ScheduledFutureTask<Void>(this, command, null, deadlineNanos(unit.toNanos(delay))));
|
return schedule(new ScheduledFutureTask<Void>(
|
||||||
|
this, delayedTaskQueue, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -738,7 +723,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
String.format("delay: %d (expected: >= 0)", delay));
|
String.format("delay: %d (expected: >= 0)", delay));
|
||||||
}
|
}
|
||||||
return schedule(new ScheduledFutureTask<V>(this, callable, deadlineNanos(unit.toNanos(delay))));
|
return schedule(new ScheduledFutureTask<V>(
|
||||||
|
this, delayedTaskQueue, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -759,8 +745,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return schedule(new ScheduledFutureTask<Void>(
|
return schedule(new ScheduledFutureTask<Void>(
|
||||||
this, Executors.<Void>callable(command, null),
|
this, delayedTaskQueue, Executors.<Void>callable(command, null),
|
||||||
deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
|
ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -781,8 +767,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return schedule(new ScheduledFutureTask<Void>(
|
return schedule(new ScheduledFutureTask<Void>(
|
||||||
this, Executors.<Void>callable(command, null),
|
this, delayedTaskQueue, Executors.<Void>callable(command, null),
|
||||||
deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
|
ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
|
||||||
}
|
}
|
||||||
|
|
||||||
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
|
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
|
||||||
@ -809,115 +795,13 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
|
|||||||
if (state == ST_NOT_STARTED) {
|
if (state == ST_NOT_STARTED) {
|
||||||
state = ST_STARTED;
|
state = ST_STARTED;
|
||||||
delayedTaskQueue.add(new ScheduledFutureTask<Void>(
|
delayedTaskQueue.add(new ScheduledFutureTask<Void>(
|
||||||
this, Executors.<Void>callable(new PurgeTask(), null),
|
this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null),
|
||||||
deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
|
ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
|
||||||
thread.start();
|
thread.start();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("ComparableImplementedButEqualsNotOverridden")
|
|
||||||
private static final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
|
|
||||||
|
|
||||||
private final long id = nextTaskId.getAndIncrement();
|
|
||||||
private long deadlineNanos;
|
|
||||||
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
|
|
||||||
private final long periodNanos;
|
|
||||||
|
|
||||||
ScheduledFutureTask(SingleThreadEventExecutor executor, Runnable runnable, V result, long nanoTime) {
|
|
||||||
this(executor, Executors.callable(runnable, result), nanoTime);
|
|
||||||
}
|
|
||||||
|
|
||||||
ScheduledFutureTask(SingleThreadEventExecutor executor, Callable<V> callable, long nanoTime, long period) {
|
|
||||||
super(executor, callable);
|
|
||||||
if (period == 0) {
|
|
||||||
throw new IllegalArgumentException("period: 0 (expected: != 0)");
|
|
||||||
}
|
|
||||||
deadlineNanos = nanoTime;
|
|
||||||
periodNanos = period;
|
|
||||||
}
|
|
||||||
|
|
||||||
ScheduledFutureTask(SingleThreadEventExecutor executor, Callable<V> callable, long nanoTime) {
|
|
||||||
super(executor, callable);
|
|
||||||
deadlineNanos = nanoTime;
|
|
||||||
periodNanos = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected SingleThreadEventExecutor executor() {
|
|
||||||
return (SingleThreadEventExecutor) super.executor();
|
|
||||||
}
|
|
||||||
|
|
||||||
public long deadlineNanos() {
|
|
||||||
return deadlineNanos;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long delayNanos() {
|
|
||||||
return Math.max(0, deadlineNanos() - nanoTime());
|
|
||||||
}
|
|
||||||
|
|
||||||
public long delayNanos(long currentTimeNanos) {
|
|
||||||
return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getDelay(TimeUnit unit) {
|
|
||||||
return unit.convert(delayNanos(), TimeUnit.NANOSECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int compareTo(Delayed o) {
|
|
||||||
if (this == o) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
|
|
||||||
long d = deadlineNanos() - that.deadlineNanos();
|
|
||||||
if (d < 0) {
|
|
||||||
return -1;
|
|
||||||
} else if (d > 0) {
|
|
||||||
return 1;
|
|
||||||
} else if (id < that.id) {
|
|
||||||
return -1;
|
|
||||||
} else if (id == that.id) {
|
|
||||||
throw new Error();
|
|
||||||
} else {
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
assert executor().inEventLoop();
|
|
||||||
try {
|
|
||||||
if (periodNanos == 0) {
|
|
||||||
if (setUncancellableInternal()) {
|
|
||||||
V result = task.call();
|
|
||||||
setSuccessInternal(result);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// check if is done as it may was cancelled
|
|
||||||
if (!isCancelled()) {
|
|
||||||
task.call();
|
|
||||||
if (!executor().isShutdown()) {
|
|
||||||
long p = periodNanos;
|
|
||||||
if (p > 0) {
|
|
||||||
deadlineNanos += p;
|
|
||||||
} else {
|
|
||||||
deadlineNanos = nanoTime() - p;
|
|
||||||
}
|
|
||||||
if (!isCancelled()) {
|
|
||||||
executor().delayedTaskQueue.add(this);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (Throwable cause) {
|
|
||||||
setFailureInternal(cause);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private final class PurgeTask implements Runnable {
|
private final class PurgeTask implements Runnable {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
@ -0,0 +1,112 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2013 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.netty.util.concurrent;
|
||||||
|
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.*;
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
public class GlobalEventExecutorTest {
|
||||||
|
|
||||||
|
private static final GlobalEventExecutor e = GlobalEventExecutor.INSTANCE;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
// Wait until the global executor is stopped (just in case there is a task running due to previous test cases)
|
||||||
|
for (;;) {
|
||||||
|
if (e.thread == null || !e.thread.isAlive()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
Thread.sleep(50);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAutomaticStartStop() throws Exception {
|
||||||
|
final TestRunnable task = new TestRunnable(500);
|
||||||
|
e.execute(task);
|
||||||
|
|
||||||
|
// Ensure the new thread has started.
|
||||||
|
Thread thread = e.thread;
|
||||||
|
assertThat(thread, is(not(nullValue())));
|
||||||
|
assertThat(thread.isAlive(), is(true));
|
||||||
|
|
||||||
|
Thread.sleep(1500);
|
||||||
|
|
||||||
|
// Ensure the thread stopped itself after running the task.
|
||||||
|
assertThat(thread.isAlive(), is(false));
|
||||||
|
assertThat(task.ran.get(), is(true));
|
||||||
|
assertThat(e.thread, sameInstance(thread));
|
||||||
|
|
||||||
|
// Ensure another new thread starts again.
|
||||||
|
task.ran.set(false);
|
||||||
|
e.execute(task);
|
||||||
|
assertThat(e.thread, not(sameInstance(thread)));
|
||||||
|
thread = e.thread;
|
||||||
|
|
||||||
|
Thread.sleep(1500);
|
||||||
|
|
||||||
|
// Ensure the thread stopped itself after running the task.
|
||||||
|
assertThat(thread.isAlive(), is(false));
|
||||||
|
assertThat(task.ran.get(), is(true));
|
||||||
|
assertThat(e.thread, sameInstance(thread));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScheduledTasks() throws Exception {
|
||||||
|
TestRunnable task = new TestRunnable(0);
|
||||||
|
ScheduledFuture<?> f = e.schedule(task, 1500, TimeUnit.MILLISECONDS);
|
||||||
|
f.sync();
|
||||||
|
assertThat(task.ran.get(), is(true));
|
||||||
|
|
||||||
|
// Ensure the thread is still running.
|
||||||
|
Thread thread = e.thread;
|
||||||
|
assertThat(thread, is(not(nullValue())));
|
||||||
|
assertThat(thread.isAlive(), is(true));
|
||||||
|
|
||||||
|
Thread.sleep(1500);
|
||||||
|
|
||||||
|
// Not it should be stopped.
|
||||||
|
assertThat(thread.isAlive(), is(false));
|
||||||
|
assertThat(e.thread, sameInstance(thread));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class TestRunnable implements Runnable {
|
||||||
|
final AtomicBoolean ran = new AtomicBoolean();
|
||||||
|
final long delay;
|
||||||
|
|
||||||
|
TestRunnable(long delay) {
|
||||||
|
this.delay = delay;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
Thread.sleep(delay);
|
||||||
|
ran.set(true);
|
||||||
|
} catch (InterruptedException ignored) {
|
||||||
|
// Ignore
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user