[#473] Fix elevated context switching in SingleThreadEventExecutor
- Remove polling in SingleThreadEventExecutor - Create a dedicated scheduled task scheduler called 'TaskScheduler' - TaskScheduler is created per EventLoopGroup / EventExecutorGroup - SingleThreadEventExecutor delegates all scheduled execution requests to TaskScheduler provided as a constructor parameter - TaskScheduler is a specialized form of single threaded ScheduledExecutorService which requires an EventExecutor as a parameter for all requests.
This commit is contained in:
parent
505e767a09
commit
421eabe666
@ -19,8 +19,9 @@ import java.util.concurrent.ThreadFactory;
|
||||
|
||||
class DefaultEventExecutor extends SingleThreadEventExecutor {
|
||||
|
||||
DefaultEventExecutor(DefaultEventExecutorGroup parent, ThreadFactory threadFactory) {
|
||||
super(parent, threadFactory);
|
||||
DefaultEventExecutor(
|
||||
DefaultEventExecutorGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) {
|
||||
super(parent, threadFactory, scheduler);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -28,7 +28,8 @@ public class DefaultEventExecutorGroup extends MultithreadEventExecutorGroup {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {
|
||||
return new DefaultEventExecutor(this, threadFactory);
|
||||
protected EventExecutor newChild(
|
||||
ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception {
|
||||
return new DefaultEventExecutor(this, threadFactory, scheduler);
|
||||
}
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
|
||||
private static final int DEFAULT_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
|
||||
private static final AtomicInteger poolId = new AtomicInteger();
|
||||
|
||||
final TaskScheduler scheduler;
|
||||
private final EventExecutor[] children;
|
||||
private final AtomicInteger childIndex = new AtomicInteger();
|
||||
|
||||
@ -40,11 +41,13 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
|
||||
threadFactory = new DefaultThreadFactory();
|
||||
}
|
||||
|
||||
scheduler = new TaskScheduler(threadFactory);
|
||||
|
||||
children = new SingleThreadEventExecutor[nThreads];
|
||||
for (int i = 0; i < nThreads; i ++) {
|
||||
boolean success = false;
|
||||
try {
|
||||
children[i] = newChild(threadFactory, args);
|
||||
children[i] = newChild(threadFactory, scheduler, args);
|
||||
success = true;
|
||||
} catch (Exception e) {
|
||||
throw new EventLoopException("failed to create a child event loop", e);
|
||||
@ -63,10 +66,12 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
|
||||
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
|
||||
}
|
||||
|
||||
protected abstract EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception;
|
||||
protected abstract EventExecutor newChild(
|
||||
ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception;
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
scheduler.shutdown();
|
||||
for (EventExecutor l: children) {
|
||||
l.shutdown();
|
||||
}
|
||||
@ -74,6 +79,9 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
|
||||
|
||||
@Override
|
||||
public boolean isShutdown() {
|
||||
if (!scheduler.isShutdown()) {
|
||||
return false;
|
||||
}
|
||||
for (EventExecutor l: children) {
|
||||
if (!l.isShutdown()) {
|
||||
return false;
|
||||
@ -84,6 +92,9 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
|
||||
|
||||
@Override
|
||||
public boolean isTerminated() {
|
||||
if (!scheduler.isTerminated()) {
|
||||
return false;
|
||||
}
|
||||
for (EventExecutor l: children) {
|
||||
if (!l.isTerminated()) {
|
||||
return false;
|
||||
@ -96,6 +107,15 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
|
||||
public boolean awaitTermination(long timeout, TimeUnit unit)
|
||||
throws InterruptedException {
|
||||
long deadline = System.nanoTime() + unit.toNanos(timeout);
|
||||
for (;;) {
|
||||
long timeLeft = deadline - System.nanoTime();
|
||||
if (timeLeft <= 0) {
|
||||
return isTerminated();
|
||||
}
|
||||
if (scheduler.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
loop: for (EventExecutor l: children) {
|
||||
for (;;) {
|
||||
long timeLeft = deadline - System.nanoTime();
|
||||
|
@ -20,35 +20,24 @@ import io.netty.logging.InternalLoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.AbstractExecutorService;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.DelayQueue;
|
||||
import java.util.concurrent.Delayed;
|
||||
import java.util.concurrent.FutureTask;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public abstract class SingleThreadEventExecutor extends AbstractExecutorService implements EventExecutor {
|
||||
|
||||
private static final InternalLogger logger =
|
||||
InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
|
||||
|
||||
private static final long SCHEDULE_CHECK_INTERVAL = TimeUnit.MILLISECONDS.toNanos(10);
|
||||
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
|
||||
private static final long START_TIME = System.nanoTime();
|
||||
private static final AtomicLong nextTaskId = new AtomicLong();
|
||||
|
||||
static final ThreadLocal<SingleThreadEventExecutor> CURRENT_EVENT_LOOP =
|
||||
new ThreadLocal<SingleThreadEventExecutor>();
|
||||
|
||||
@ -56,33 +45,27 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
|
||||
return CURRENT_EVENT_LOOP.get();
|
||||
}
|
||||
|
||||
private static long nanoTime() {
|
||||
return System.nanoTime() - START_TIME;
|
||||
}
|
||||
|
||||
private static long deadlineNanos(long delay) {
|
||||
return nanoTime() + delay;
|
||||
}
|
||||
|
||||
private final EventExecutorGroup parent;
|
||||
private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
|
||||
private final Thread thread;
|
||||
private final Object stateLock = new Object();
|
||||
private final Semaphore threadLock = new Semaphore(0);
|
||||
// TODO: Use PriorityQueue to reduce the locking overhead of DelayQueue.
|
||||
private final Queue<ScheduledFutureTask<?>> scheduledTasks = new DelayQueue<ScheduledFutureTask<?>>();
|
||||
private final TaskScheduler scheduler;
|
||||
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
|
||||
/** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */
|
||||
private volatile int state;
|
||||
private long lastCheckTimeNanos;
|
||||
private long lastPurgeTimeNanos;
|
||||
|
||||
protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory) {
|
||||
protected SingleThreadEventExecutor(
|
||||
EventExecutorGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) {
|
||||
if (threadFactory == null) {
|
||||
throw new NullPointerException("threadFactory");
|
||||
}
|
||||
if (scheduler == null) {
|
||||
throw new NullPointerException("scheduler");
|
||||
}
|
||||
|
||||
this.parent = parent;
|
||||
this.scheduler = scheduler;
|
||||
|
||||
thread = threadFactory.newThread(new Runnable() {
|
||||
@Override
|
||||
@ -115,7 +98,6 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
|
||||
private void cleanupTasks() {
|
||||
for (;;) {
|
||||
boolean ran = false;
|
||||
cancelScheduledTasks();
|
||||
ran |= runAllTasks();
|
||||
ran |= runShutdownHooks();
|
||||
if (!ran && !hasTasks()) {
|
||||
@ -142,67 +124,24 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
|
||||
|
||||
protected Runnable pollTask() {
|
||||
assert inEventLoop();
|
||||
|
||||
Runnable task = taskQueue.poll();
|
||||
if (task != null) {
|
||||
return task;
|
||||
}
|
||||
|
||||
if (fetchScheduledTasks()) {
|
||||
task = taskQueue.poll();
|
||||
return task;
|
||||
}
|
||||
|
||||
return null;
|
||||
return taskQueue.poll();
|
||||
}
|
||||
|
||||
protected Runnable takeTask() throws InterruptedException {
|
||||
assert inEventLoop();
|
||||
|
||||
for (;;) {
|
||||
Runnable task = taskQueue.poll(SCHEDULE_CHECK_INTERVAL * 2 / 3, TimeUnit.NANOSECONDS);
|
||||
if (task != null) {
|
||||
return task;
|
||||
}
|
||||
fetchScheduledTasks();
|
||||
task = taskQueue.poll();
|
||||
if (task != null) {
|
||||
return task;
|
||||
}
|
||||
}
|
||||
return taskQueue.take();
|
||||
}
|
||||
|
||||
protected Runnable peekTask() {
|
||||
assert inEventLoop();
|
||||
|
||||
Runnable task = taskQueue.peek();
|
||||
if (task != null) {
|
||||
return task;
|
||||
}
|
||||
|
||||
if (fetchScheduledTasks()) {
|
||||
task = taskQueue.peek();
|
||||
return task;
|
||||
}
|
||||
|
||||
return null;
|
||||
return taskQueue.peek();
|
||||
}
|
||||
|
||||
protected boolean hasTasks() {
|
||||
assert inEventLoop();
|
||||
|
||||
boolean empty = taskQueue.isEmpty();
|
||||
if (!empty) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (fetchScheduledTasks()) {
|
||||
return !taskQueue.isEmpty();
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
protected void addTask(Runnable task) {
|
||||
if (task == null) {
|
||||
throw new NullPointerException("task");
|
||||
@ -397,228 +336,21 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
|
||||
if (command == null) {
|
||||
throw new NullPointerException("command");
|
||||
}
|
||||
if (unit == null) {
|
||||
throw new NullPointerException("unit");
|
||||
}
|
||||
if (delay < 0) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("delay: %d (expected: >= 0)", delay));
|
||||
}
|
||||
return schedule(new ScheduledFutureTask<Void>(command, null, deadlineNanos(unit.toNanos(delay))));
|
||||
return scheduler.schedule(this, command, delay, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
|
||||
if (callable == null) {
|
||||
throw new NullPointerException("callable");
|
||||
}
|
||||
if (unit == null) {
|
||||
throw new NullPointerException("unit");
|
||||
}
|
||||
if (delay < 0) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("delay: %d (expected: >= 0)", delay));
|
||||
}
|
||||
return schedule(new ScheduledFutureTask<V>(callable, deadlineNanos(unit.toNanos(delay))));
|
||||
return scheduler.schedule(this, callable, delay, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
|
||||
if (command == null) {
|
||||
throw new NullPointerException("command");
|
||||
}
|
||||
if (unit == null) {
|
||||
throw new NullPointerException("unit");
|
||||
}
|
||||
if (initialDelay < 0) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
|
||||
}
|
||||
if (period <= 0) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("period: %d (expected: > 0)", period));
|
||||
}
|
||||
|
||||
return schedule(new ScheduledFutureTask<Void>(
|
||||
command, null, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
|
||||
return scheduler.scheduleAtFixedRate(this, command, initialDelay, period, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
|
||||
if (command == null) {
|
||||
throw new NullPointerException("command");
|
||||
}
|
||||
if (unit == null) {
|
||||
throw new NullPointerException("unit");
|
||||
}
|
||||
if (initialDelay < 0) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
|
||||
}
|
||||
if (delay <= 0) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("delay: %d (expected: > 0)", delay));
|
||||
}
|
||||
|
||||
return schedule(new ScheduledFutureTask<Void>(
|
||||
command, null, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
|
||||
}
|
||||
|
||||
private <V> ScheduledFuture<V> schedule(ScheduledFutureTask<V> task) {
|
||||
if (isShutdown()) {
|
||||
reject();
|
||||
}
|
||||
scheduledTasks.add(task);
|
||||
if (isShutdown()) {
|
||||
task.cancel(false);
|
||||
}
|
||||
|
||||
if (!inEventLoop()) {
|
||||
synchronized (stateLock) {
|
||||
if (state == 0) {
|
||||
state = 1;
|
||||
thread.start();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
fetchScheduledTasks();
|
||||
}
|
||||
|
||||
return task;
|
||||
}
|
||||
|
||||
private boolean fetchScheduledTasks() {
|
||||
if (scheduledTasks.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
long nanoTime = nanoTime();
|
||||
if (nanoTime - lastPurgeTimeNanos >= SCHEDULE_PURGE_INTERVAL) {
|
||||
for (Iterator<ScheduledFutureTask<?>> i = scheduledTasks.iterator(); i.hasNext();) {
|
||||
ScheduledFutureTask<?> task = i.next();
|
||||
if (task.isCancelled()) {
|
||||
i.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (nanoTime - lastCheckTimeNanos >= SCHEDULE_CHECK_INTERVAL) {
|
||||
boolean added = false;
|
||||
for (;;) {
|
||||
ScheduledFutureTask<?> task = scheduledTasks.poll();
|
||||
if (task == null) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (!task.isCancelled()) {
|
||||
if (isShutdown()) {
|
||||
task.cancel(false);
|
||||
} else {
|
||||
taskQueue.add(task);
|
||||
added = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return added;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private void cancelScheduledTasks() {
|
||||
if (scheduledTasks.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (ScheduledFutureTask<?> task: scheduledTasks.toArray(new ScheduledFutureTask<?>[scheduledTasks.size()])) {
|
||||
task.cancel(false);
|
||||
}
|
||||
scheduledTasks.clear();
|
||||
}
|
||||
|
||||
private class ScheduledFutureTask<V> extends FutureTask<V> implements ScheduledFuture<V> {
|
||||
|
||||
private final long id = nextTaskId.getAndIncrement();
|
||||
private long deadlineNanos;
|
||||
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
|
||||
private final long periodNanos;
|
||||
|
||||
ScheduledFutureTask(Runnable runnable, V result, long nanoTime) {
|
||||
super(runnable, result);
|
||||
deadlineNanos = nanoTime;
|
||||
periodNanos = 0;
|
||||
}
|
||||
|
||||
ScheduledFutureTask(Runnable runnable, V result, long nanoTime, long period) {
|
||||
super(runnable, result);
|
||||
if (period == 0) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("period: %d (expected: != 0)", period));
|
||||
}
|
||||
deadlineNanos = nanoTime;
|
||||
periodNanos = period;
|
||||
}
|
||||
|
||||
ScheduledFutureTask(Callable<V> callable, long nanoTime) {
|
||||
super(callable);
|
||||
deadlineNanos = nanoTime;
|
||||
periodNanos = 0;
|
||||
}
|
||||
|
||||
public long deadlineNanos() {
|
||||
return deadlineNanos;
|
||||
}
|
||||
|
||||
public long delayNanos() {
|
||||
return Math.max(0, deadlineNanos() - nanoTime());
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDelay(TimeUnit unit) {
|
||||
return unit.convert(delayNanos(), TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Delayed o) {
|
||||
if (this == o) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
|
||||
long d = deadlineNanos() - that.deadlineNanos();
|
||||
if (d < 0) {
|
||||
return -1;
|
||||
} else if (d > 0) {
|
||||
return 1;
|
||||
} else if (id < that.id) {
|
||||
return -1;
|
||||
} else if (id == that.id) {
|
||||
throw new Error();
|
||||
} else {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (periodNanos == 0) {
|
||||
super.run();
|
||||
} else {
|
||||
boolean reset = runAndReset();
|
||||
if (reset && !isShutdown()) {
|
||||
long p = periodNanos;
|
||||
if (p > 0) {
|
||||
deadlineNanos += p;
|
||||
} else {
|
||||
deadlineNanos = nanoTime() - p;
|
||||
}
|
||||
|
||||
schedule(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
return scheduler.scheduleWithFixedDelay(this, command, initialDelay, delay, unit);
|
||||
}
|
||||
}
|
||||
|
@ -19,8 +19,9 @@ import java.util.concurrent.ThreadFactory;
|
||||
|
||||
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
|
||||
|
||||
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory) {
|
||||
super(parent, threadFactory);
|
||||
protected SingleThreadEventLoop(
|
||||
EventLoopGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) {
|
||||
super(parent, threadFactory, scheduler);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
426
transport/src/main/java/io/netty/channel/TaskScheduler.java
Normal file
426
transport/src/main/java/io/netty/channel/TaskScheduler.java
Normal file
@ -0,0 +1,426 @@
|
||||
/*
|
||||
* Copyright 2012 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel;
|
||||
|
||||
import io.netty.logging.InternalLogger;
|
||||
import io.netty.logging.InternalLoggerFactory;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.DelayQueue;
|
||||
import java.util.concurrent.Delayed;
|
||||
import java.util.concurrent.FutureTask;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public final class TaskScheduler {
|
||||
|
||||
private static final InternalLogger logger =
|
||||
InternalLoggerFactory.getInstance(TaskScheduler.class);
|
||||
|
||||
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
|
||||
private static final long START_TIME = System.nanoTime();
|
||||
private static final AtomicLong nextTaskId = new AtomicLong();
|
||||
|
||||
private static long nanoTime() {
|
||||
return System.nanoTime() - START_TIME;
|
||||
}
|
||||
|
||||
private static long deadlineNanos(long delay) {
|
||||
return nanoTime() + delay;
|
||||
}
|
||||
|
||||
private final BlockingQueue<ScheduledFutureTask<?>> taskQueue = new DelayQueue<ScheduledFutureTask<?>>();
|
||||
private final Thread thread;
|
||||
private final Object stateLock = new Object();
|
||||
private final Semaphore threadLock = new Semaphore(0);
|
||||
/** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */
|
||||
private volatile int state;
|
||||
|
||||
public TaskScheduler(ThreadFactory threadFactory) {
|
||||
if (threadFactory == null) {
|
||||
throw new NullPointerException("threadFactory");
|
||||
}
|
||||
|
||||
thread = threadFactory.newThread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
for (;;) {
|
||||
ScheduledFutureTask<?> task;
|
||||
try {
|
||||
task = taskQueue.take();
|
||||
runTask(task);
|
||||
} catch (InterruptedException e) {
|
||||
// Waken up by interruptThread()
|
||||
}
|
||||
|
||||
if (isShutdown() && taskQueue.peek() == null) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
// Run all remaining tasks and shutdown hooks.
|
||||
try {
|
||||
cleanupTasks();
|
||||
} finally {
|
||||
synchronized (stateLock) {
|
||||
state = 3;
|
||||
}
|
||||
}
|
||||
cleanupTasks();
|
||||
} finally {
|
||||
threadLock.release();
|
||||
assert taskQueue.isEmpty();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void runTask(ScheduledFutureTask<?> task) {
|
||||
EventExecutor executor = task.executor;
|
||||
if (executor == null) {
|
||||
task.run();
|
||||
} else {
|
||||
if (executor.isShutdown()) {
|
||||
task.cancel(false);
|
||||
} else {
|
||||
try {
|
||||
task.executor.execute(task);
|
||||
} catch (RejectedExecutionException e) {
|
||||
task.cancel(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanupTasks() {
|
||||
for (;;) {
|
||||
boolean ran = false;
|
||||
cancelScheduledTasks();
|
||||
for (;;) {
|
||||
final ScheduledFutureTask<?> task = taskQueue.poll();
|
||||
if (task == null) {
|
||||
break;
|
||||
}
|
||||
|
||||
try {
|
||||
runTask(task);
|
||||
ran = true;
|
||||
} catch (Throwable t) {
|
||||
logger.warn("A task raised an exception.", t);
|
||||
}
|
||||
}
|
||||
|
||||
if (!ran && taskQueue.isEmpty()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private boolean inSameThread() {
|
||||
return Thread.currentThread() == thread;
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
boolean inSameThread = inSameThread();
|
||||
boolean wakeup = false;
|
||||
if (inSameThread) {
|
||||
synchronized (stateLock) {
|
||||
assert state == 1;
|
||||
state = 2;
|
||||
wakeup = true;
|
||||
}
|
||||
} else {
|
||||
synchronized (stateLock) {
|
||||
switch (state) {
|
||||
case 0:
|
||||
state = 3;
|
||||
threadLock.release();
|
||||
break;
|
||||
case 1:
|
||||
state = 2;
|
||||
wakeup = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (wakeup && !inSameThread && isShutdown()) {
|
||||
thread.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isShutdown() {
|
||||
return state >= 2;
|
||||
}
|
||||
|
||||
public boolean isTerminated() {
|
||||
return state == 3;
|
||||
}
|
||||
|
||||
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
|
||||
if (unit == null) {
|
||||
throw new NullPointerException("unit");
|
||||
}
|
||||
|
||||
if (inSameThread()) {
|
||||
throw new IllegalStateException("cannot await termination of the current thread");
|
||||
}
|
||||
|
||||
if (threadLock.tryAcquire(timeout, unit)) {
|
||||
threadLock.release();
|
||||
}
|
||||
|
||||
return isTerminated();
|
||||
}
|
||||
|
||||
public ScheduledFuture<?> schedule(
|
||||
EventExecutor executor, Runnable command, long delay, TimeUnit unit) {
|
||||
if (executor == null) {
|
||||
throw new NullPointerException("executor");
|
||||
}
|
||||
if (command == null) {
|
||||
throw new NullPointerException("command");
|
||||
}
|
||||
if (unit == null) {
|
||||
throw new NullPointerException("unit");
|
||||
}
|
||||
if (delay < 0) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("delay: %d (expected: >= 0)", delay));
|
||||
}
|
||||
return schedule(new ScheduledFutureTask<Void>(executor, command, null, deadlineNanos(unit.toNanos(delay))));
|
||||
}
|
||||
|
||||
public <V> ScheduledFuture<V> schedule(
|
||||
EventExecutor executor, Callable<V> callable, long delay, TimeUnit unit) {
|
||||
if (executor == null) {
|
||||
throw new NullPointerException("executor");
|
||||
}
|
||||
if (callable == null) {
|
||||
throw new NullPointerException("callable");
|
||||
}
|
||||
if (unit == null) {
|
||||
throw new NullPointerException("unit");
|
||||
}
|
||||
if (delay < 0) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("delay: %d (expected: >= 0)", delay));
|
||||
}
|
||||
return schedule(new ScheduledFutureTask<V>(executor, callable, deadlineNanos(unit.toNanos(delay))));
|
||||
}
|
||||
|
||||
public ScheduledFuture<?> scheduleAtFixedRate(
|
||||
EventExecutor executor, Runnable command, long initialDelay, long period, TimeUnit unit) {
|
||||
if (executor == null) {
|
||||
throw new NullPointerException("executor");
|
||||
}
|
||||
if (command == null) {
|
||||
throw new NullPointerException("command");
|
||||
}
|
||||
if (unit == null) {
|
||||
throw new NullPointerException("unit");
|
||||
}
|
||||
if (initialDelay < 0) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
|
||||
}
|
||||
if (period <= 0) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("period: %d (expected: > 0)", period));
|
||||
}
|
||||
|
||||
return schedule(new ScheduledFutureTask<Void>(
|
||||
executor, command, null, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
|
||||
}
|
||||
|
||||
public ScheduledFuture<?> scheduleWithFixedDelay(
|
||||
EventExecutor executor, Runnable command, long initialDelay, long delay, TimeUnit unit) {
|
||||
if (executor == null) {
|
||||
throw new NullPointerException("executor");
|
||||
}
|
||||
if (command == null) {
|
||||
throw new NullPointerException("command");
|
||||
}
|
||||
if (unit == null) {
|
||||
throw new NullPointerException("unit");
|
||||
}
|
||||
if (initialDelay < 0) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
|
||||
}
|
||||
if (delay <= 0) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("delay: %d (expected: > 0)", delay));
|
||||
}
|
||||
|
||||
return schedule(new ScheduledFutureTask<Void>(
|
||||
executor, command, null, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
|
||||
}
|
||||
|
||||
private <V> ScheduledFuture<V> schedule(ScheduledFutureTask<V> task) {
|
||||
if (isShutdown()) {
|
||||
reject();
|
||||
}
|
||||
taskQueue.add(task);
|
||||
if (isShutdown()) {
|
||||
task.cancel(false);
|
||||
}
|
||||
|
||||
boolean started = false;
|
||||
if (!inSameThread()) {
|
||||
synchronized (stateLock) {
|
||||
if (state == 0) {
|
||||
state = 1;
|
||||
thread.start();
|
||||
started = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (started) {
|
||||
schedule(new ScheduledFutureTask<Void>(
|
||||
null, new PurgeTask(), null,
|
||||
deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
|
||||
}
|
||||
|
||||
return task;
|
||||
}
|
||||
|
||||
private static void reject() {
|
||||
throw new RejectedExecutionException("event executor shut down");
|
||||
}
|
||||
|
||||
private void cancelScheduledTasks() {
|
||||
if (taskQueue.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (ScheduledFutureTask<?> task: taskQueue.toArray(new ScheduledFutureTask<?>[taskQueue.size()])) {
|
||||
task.cancel(false);
|
||||
}
|
||||
|
||||
taskQueue.clear();
|
||||
}
|
||||
|
||||
private class ScheduledFutureTask<V> extends FutureTask<V> implements ScheduledFuture<V> {
|
||||
|
||||
private final EventExecutor executor;
|
||||
private final long id = nextTaskId.getAndIncrement();
|
||||
private long deadlineNanos;
|
||||
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
|
||||
private final long periodNanos;
|
||||
|
||||
ScheduledFutureTask(EventExecutor executor, Runnable runnable, V result, long nanoTime) {
|
||||
super(runnable, result);
|
||||
this.executor = executor;
|
||||
deadlineNanos = nanoTime;
|
||||
periodNanos = 0;
|
||||
}
|
||||
|
||||
ScheduledFutureTask(EventExecutor executor, Runnable runnable, V result, long nanoTime, long period) {
|
||||
super(runnable, result);
|
||||
if (period == 0) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("period: %d (expected: != 0)", period));
|
||||
}
|
||||
this.executor = executor;
|
||||
deadlineNanos = nanoTime;
|
||||
periodNanos = period;
|
||||
}
|
||||
|
||||
ScheduledFutureTask(EventExecutor executor, Callable<V> callable, long nanoTime) {
|
||||
super(callable);
|
||||
this.executor = executor;
|
||||
deadlineNanos = nanoTime;
|
||||
periodNanos = 0;
|
||||
}
|
||||
|
||||
public long deadlineNanos() {
|
||||
return deadlineNanos;
|
||||
}
|
||||
|
||||
public long delayNanos() {
|
||||
return Math.max(0, deadlineNanos() - nanoTime());
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDelay(TimeUnit unit) {
|
||||
return unit.convert(delayNanos(), TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Delayed o) {
|
||||
if (this == o) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
|
||||
long d = deadlineNanos() - that.deadlineNanos();
|
||||
if (d < 0) {
|
||||
return -1;
|
||||
} else if (d > 0) {
|
||||
return 1;
|
||||
} else if (id < that.id) {
|
||||
return -1;
|
||||
} else if (id == that.id) {
|
||||
throw new Error();
|
||||
} else {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (periodNanos == 0) {
|
||||
super.run();
|
||||
} else {
|
||||
boolean reset = runAndReset();
|
||||
if (reset && !isShutdown()) {
|
||||
long p = periodNanos;
|
||||
if (p > 0) {
|
||||
deadlineNanos += p;
|
||||
} else {
|
||||
deadlineNanos = nanoTime() - p;
|
||||
}
|
||||
|
||||
schedule(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final class PurgeTask implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
Iterator<ScheduledFutureTask<?>> i = taskQueue.iterator();
|
||||
while (i.hasNext()) {
|
||||
ScheduledFutureTask<?> task = i.next();
|
||||
if (task.isCancelled()) {
|
||||
i.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -16,13 +16,15 @@
|
||||
package io.netty.channel.local;
|
||||
|
||||
import io.netty.channel.SingleThreadEventLoop;
|
||||
import io.netty.channel.TaskScheduler;
|
||||
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
|
||||
final class LocalEventLoop extends SingleThreadEventLoop {
|
||||
|
||||
LocalEventLoop(LocalEventLoopGroup parent, ThreadFactory threadFactory) {
|
||||
super(parent, threadFactory);
|
||||
LocalEventLoop(
|
||||
LocalEventLoopGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) {
|
||||
super(parent, threadFactory, scheduler);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -17,6 +17,7 @@ package io.netty.channel.local;
|
||||
|
||||
import io.netty.channel.EventExecutor;
|
||||
import io.netty.channel.MultithreadEventLoopGroup;
|
||||
import io.netty.channel.TaskScheduler;
|
||||
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
|
||||
@ -35,7 +36,8 @@ public class LocalEventLoopGroup extends MultithreadEventLoopGroup {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {
|
||||
return new LocalEventLoop(this, threadFactory);
|
||||
protected EventExecutor newChild(
|
||||
ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception {
|
||||
return new LocalEventLoop(this, threadFactory, scheduler);
|
||||
}
|
||||
}
|
||||
|
@ -68,7 +68,7 @@ abstract class AbstractAioChannel extends AbstractChannel {
|
||||
|
||||
@Override
|
||||
protected Runnable doRegister() throws Exception {
|
||||
if (((AioChildEventLoop) eventLoop()).parent() != group) {
|
||||
if (((AioEventLoop) eventLoop()).parent() != group) {
|
||||
throw new ChannelException(
|
||||
getClass().getSimpleName() + " must be registered to the " +
|
||||
AioEventLoopGroup.class.getSimpleName() + " which was specified in the constructor.");
|
||||
@ -83,7 +83,7 @@ abstract class AbstractAioChannel extends AbstractChannel {
|
||||
|
||||
@Override
|
||||
protected boolean isCompatible(EventLoop loop) {
|
||||
return loop instanceof AioChildEventLoop;
|
||||
return loop instanceof AioEventLoop;
|
||||
}
|
||||
|
||||
protected abstract class AbstractAioUnsafe extends AbstractUnsafe {
|
||||
|
@ -16,13 +16,14 @@
|
||||
package io.netty.channel.socket.aio;
|
||||
|
||||
import io.netty.channel.SingleThreadEventLoop;
|
||||
import io.netty.channel.TaskScheduler;
|
||||
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
|
||||
final class AioChildEventLoop extends SingleThreadEventLoop {
|
||||
final class AioEventLoop extends SingleThreadEventLoop {
|
||||
|
||||
AioChildEventLoop(AioEventLoopGroup parent, ThreadFactory threadFactory) {
|
||||
super(parent, threadFactory);
|
||||
AioEventLoop(AioEventLoopGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) {
|
||||
super(parent, threadFactory, scheduler);
|
||||
}
|
||||
|
||||
@Override
|
@ -18,6 +18,7 @@ package io.netty.channel.socket.aio;
|
||||
import io.netty.channel.EventExecutor;
|
||||
import io.netty.channel.EventLoopException;
|
||||
import io.netty.channel.MultithreadEventLoopGroup;
|
||||
import io.netty.channel.TaskScheduler;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
@ -57,8 +58,9 @@ public class AioEventLoopGroup extends MultithreadEventLoopGroup {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {
|
||||
return new AioChildEventLoop(this, threadFactory);
|
||||
protected EventExecutor newChild(
|
||||
ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception {
|
||||
return new AioEventLoop(this, threadFactory, scheduler);
|
||||
}
|
||||
|
||||
private void executeAioTask(Runnable command) {
|
||||
|
@ -18,6 +18,7 @@ package io.netty.channel.socket.nio;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelException;
|
||||
import io.netty.channel.SingleThreadEventLoop;
|
||||
import io.netty.channel.TaskScheduler;
|
||||
import io.netty.channel.socket.nio.AbstractNioChannel.NioUnsafe;
|
||||
import io.netty.logging.InternalLogger;
|
||||
import io.netty.logging.InternalLoggerFactory;
|
||||
@ -60,8 +61,10 @@ final class NioEventLoop extends SingleThreadEventLoop {
|
||||
private int cancelledKeys;
|
||||
private boolean cleanedCancelledKeys;
|
||||
|
||||
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
|
||||
super(parent, threadFactory);
|
||||
NioEventLoop(
|
||||
NioEventLoopGroup parent, ThreadFactory threadFactory,
|
||||
TaskScheduler scheduler, SelectorProvider selectorProvider) {
|
||||
super(parent, threadFactory, scheduler);
|
||||
if (selectorProvider == null) {
|
||||
throw new NullPointerException("selectorProvider");
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ package io.netty.channel.socket.nio;
|
||||
|
||||
import io.netty.channel.EventExecutor;
|
||||
import io.netty.channel.MultithreadEventLoopGroup;
|
||||
import io.netty.channel.TaskScheduler;
|
||||
|
||||
import java.nio.channels.spi.SelectorProvider;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
@ -35,18 +36,20 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup {
|
||||
super(nThreads, threadFactory);
|
||||
}
|
||||
|
||||
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
|
||||
public NioEventLoopGroup(
|
||||
int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
|
||||
super(nThreads, threadFactory, selectorProvider);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {
|
||||
protected EventExecutor newChild(
|
||||
ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception {
|
||||
SelectorProvider selectorProvider;
|
||||
if (args == null || args.length == 0 || args[0] == null) {
|
||||
selectorProvider = SelectorProvider.provider();
|
||||
} else {
|
||||
selectorProvider = (SelectorProvider) args[0];
|
||||
}
|
||||
return new NioEventLoop(this, threadFactory, selectorProvider);
|
||||
return new NioEventLoop(this, threadFactory, scheduler, selectorProvider);
|
||||
}
|
||||
}
|
||||
|
@ -27,7 +27,7 @@ class OioEventLoop extends SingleThreadEventLoop {
|
||||
private AbstractOioChannel ch;
|
||||
|
||||
OioEventLoop(OioEventLoopGroup parent) {
|
||||
super(parent, parent.threadFactory);
|
||||
super(parent, parent.threadFactory, parent.scheduler);
|
||||
this.parent = parent;
|
||||
}
|
||||
|
||||
|
@ -21,6 +21,7 @@ import io.netty.channel.ChannelException;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.TaskScheduler;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Queue;
|
||||
@ -34,6 +35,7 @@ import java.util.concurrent.TimeUnit;
|
||||
public class OioEventLoopGroup implements EventLoopGroup {
|
||||
|
||||
private final int maxChannels;
|
||||
final TaskScheduler scheduler;
|
||||
final ThreadFactory threadFactory;
|
||||
final Set<OioEventLoop> activeChildren = Collections.newSetFromMap(
|
||||
new ConcurrentHashMap<OioEventLoop, Boolean>());
|
||||
@ -60,6 +62,8 @@ public class OioEventLoopGroup implements EventLoopGroup {
|
||||
this.maxChannels = maxChannels;
|
||||
this.threadFactory = threadFactory;
|
||||
|
||||
scheduler = new TaskScheduler(threadFactory);
|
||||
|
||||
tooManyChannels = new ChannelException("too many channels (max: " + maxChannels + ')');
|
||||
tooManyChannels.setStackTrace(new StackTraceElement[0]);
|
||||
}
|
||||
@ -71,6 +75,7 @@ public class OioEventLoopGroup implements EventLoopGroup {
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
scheduler.shutdown();
|
||||
for (EventLoop l: activeChildren) {
|
||||
l.shutdown();
|
||||
}
|
||||
@ -81,6 +86,9 @@ public class OioEventLoopGroup implements EventLoopGroup {
|
||||
|
||||
@Override
|
||||
public boolean isShutdown() {
|
||||
if (!scheduler.isShutdown()) {
|
||||
return false;
|
||||
}
|
||||
for (EventLoop l: activeChildren) {
|
||||
if (!l.isShutdown()) {
|
||||
return false;
|
||||
@ -96,6 +104,9 @@ public class OioEventLoopGroup implements EventLoopGroup {
|
||||
|
||||
@Override
|
||||
public boolean isTerminated() {
|
||||
if (!scheduler.isTerminated()) {
|
||||
return false;
|
||||
}
|
||||
for (EventLoop l: activeChildren) {
|
||||
if (!l.isTerminated()) {
|
||||
return false;
|
||||
@ -113,6 +124,15 @@ public class OioEventLoopGroup implements EventLoopGroup {
|
||||
public boolean awaitTermination(long timeout, TimeUnit unit)
|
||||
throws InterruptedException {
|
||||
long deadline = System.nanoTime() + unit.toNanos(timeout);
|
||||
for (;;) {
|
||||
long timeLeft = deadline - System.nanoTime();
|
||||
if (timeLeft <= 0) {
|
||||
return isTerminated();
|
||||
}
|
||||
if (scheduler.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
for (EventLoop l: activeChildren) {
|
||||
for (;;) {
|
||||
long timeLeft = deadline - System.nanoTime();
|
||||
|
@ -256,7 +256,8 @@ public class SingleThreadEventLoopTest {
|
||||
final AtomicInteger cleanedUp = new AtomicInteger();
|
||||
|
||||
SingleThreadEventLoopImpl() {
|
||||
super(null, Executors.defaultThreadFactory());
|
||||
super(null, Executors.defaultThreadFactory(),
|
||||
new TaskScheduler(Executors.defaultThreadFactory()));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
Loading…
Reference in New Issue
Block a user