Make SingleThreadEventExecutor independent from TaskScheduler

- Related issue: #817
This commit is contained in:
Trustin Lee 2013-03-22 09:00:38 +09:00
parent 52c4e042d6
commit 4097dee49d
15 changed files with 536 additions and 148 deletions

View File

@ -24,12 +24,8 @@ import java.util.concurrent.ThreadFactory;
*/
final class DefaultEventExecutor extends SingleThreadEventExecutor {
/**
* @see SingleThreadEventExecutor#SingleThreadEventExecutor(EventExecutorGroup, ThreadFactory, TaskScheduler)
*/
DefaultEventExecutor(
DefaultEventExecutorGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) {
super(parent, threadFactory, scheduler);
DefaultEventExecutor(DefaultEventExecutorGroup parent, ThreadFactory threadFactory) {
super(parent, threadFactory);
}
@Override

View File

@ -23,23 +23,17 @@ import java.util.concurrent.ThreadFactory;
*/
public class DefaultEventExecutorGroup extends MultithreadEventExecutorGroup {
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, ThreadFactory, Object...)
*/
public DefaultEventExecutorGroup(int nThreads) {
this(nThreads, null);
}
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, ThreadFactory, Object...)
*/
public DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory);
}
@Override
protected EventExecutor newChild(
ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception {
return new DefaultEventExecutor(this, threadFactory, scheduler);
ThreadFactory threadFactory, Object... args) throws Exception {
return new DefaultEventExecutor(this, threadFactory);
}
}

View File

@ -31,7 +31,6 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
public static final int DEFAULT_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
private static final AtomicInteger poolId = new AtomicInteger();
final TaskScheduler scheduler;
private final EventExecutor[] children;
private final AtomicInteger childIndex = new AtomicInteger();
@ -41,9 +40,7 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
* @param nThreads the number of threads that will be used by this instance. Use 0 for the default number
* of {@link #DEFAULT_POOL_SIZE}
* @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used.
* @param args arguments which will passed to each
* {@link #newChild(ThreadFactory, TaskScheduler, Object...)}
* call.
* @param args arguments which will passed to each {@link #newChild(ThreadFactory, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
if (nThreads < 0) {
@ -58,13 +55,11 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
threadFactory = new DefaultThreadFactory();
}
scheduler = new TaskScheduler(threadFactory);
children = new SingleThreadEventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(threadFactory, scheduler, args);
children[i] = newChild(threadFactory, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
@ -99,7 +94,7 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
*
*/
protected abstract EventExecutor newChild(
ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception;
ThreadFactory threadFactory, Object... args) throws Exception;
@Override
public void shutdown() {
@ -107,7 +102,6 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
return;
}
scheduler.shutdown();
for (EventExecutor l: children) {
l.shutdown();
}
@ -115,9 +109,6 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
@Override
public boolean isShutdown() {
if (!scheduler.isShutdown()) {
return false;
}
for (EventExecutor l: children) {
if (!l.isShutdown()) {
return false;
@ -128,9 +119,6 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
@Override
public boolean isTerminated() {
if (!scheduler.isTerminated()) {
return false;
}
for (EventExecutor l: children) {
if (!l.isTerminated()) {
return false;
@ -143,15 +131,6 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long deadline = System.nanoTime() + unit.toNanos(timeout);
for (;;) {
long timeLeft = deadline - System.nanoTime();
if (timeLeft <= 0) {
return isTerminated();
}
if (scheduler.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
break;
}
}
loop: for (EventExecutor l: children) {
for (;;) {
long timeLeft = deadline - System.nanoTime();

View File

@ -20,22 +20,30 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Delayed;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
/**
* Abstract base class for {@link EventExecutor}'s that execute all its submitted tasks in a single thread.
*
*/
public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
public abstract class SingleThreadEventExecutor extends AbstractEventExecutorWithoutScheduler {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
@ -70,6 +78,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
private final EventExecutorGroup parent;
private final Queue<Runnable> taskQueue;
final Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
private final Thread thread;
private final Object stateLock = new Object();
private final Semaphore threadLock = new Semaphore(0);
@ -82,12 +92,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
*
* @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
* @param threadFactory the {@link ThreadFactory} which will be used for the used {@link Thread}
* @param scheduler the {@link TaskScheduler} which will be used to schedule Tasks for later
* execution
*/
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) {
super(scheduler);
protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
@ -187,11 +193,55 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
*/
protected Runnable takeTask() throws InterruptedException {
assert inEventLoop();
if (taskQueue instanceof BlockingQueue) {
return ((BlockingQueue<Runnable>) taskQueue).take();
} else {
if (!(taskQueue instanceof BlockingQueue)) {
throw new UnsupportedOperationException();
}
BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
for (;;) {
ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
if (delayedTask == null) {
return taskQueue.take();
} else {
long delayNanos = delayedTask.delayNanos();
Runnable task;
if (delayNanos > 0) {
task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
} else {
task = taskQueue.poll();
}
if (task == null) {
fetchFromDelayedQueue();
task = taskQueue.poll();
}
if (task != null) {
return task;
}
}
}
}
private void fetchFromDelayedQueue() {
long nanoTime = 0L;
for (;;) {
ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
if (delayedTask == null) {
break;
}
if (nanoTime == 0L) {
nanoTime = nanoTime();
}
if (delayedTask.deadlineNanos() <= nanoTime) {
delayedTaskQueue.remove();
taskQueue.add(delayedTask);
} else {
break;
}
}
}
/**
@ -240,6 +290,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
* @return {@code true} if and only if at least one task was run
*/
protected boolean runAllTasks() {
fetchFromDelayedQueue();
Runnable task = pollTask();
if (task == null) {
return false;
@ -264,6 +315,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
* the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
*/
protected boolean runAllTasks(long timeoutNanos) {
fetchFromDelayedQueue();
Runnable task = pollTask();
if (task == null) {
return false;
@ -297,6 +349,31 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
return true;
}
/**
* Returns the ammount of time left until the scheduled task with the closest dead line is executed.
*/
protected long delayNanos() {
ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
if (delayedTask == null) {
return SCHEDULE_PURGE_INTERVAL;
}
return delayedTask.delayNanos();
}
/**
* Returns the ammount of time left until the scheduled task with the closest dead line is executed.
*/
protected long delayMillis() {
long delayNanos = delayNanos();
long delayMillis = delayNanos / 1000000L;
if (delayNanos % 1000000L < 500000L) {
return delayMillis;
} else {
return delayMillis + 1;
}
}
/**
*
*/
@ -437,6 +514,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
throw new IllegalStateException("must be invoked from an event loop");
}
cancelDelayedTasks();
if (runAllTasks() || runShutdownHooks()) {
// There were tasks in the queue. Wait a little bit more until no tasks are queued for SHUTDOWN_DELAY_NANOS.
lastAccessTimeNanos = 0;
@ -466,6 +545,21 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
return true;
}
private void cancelDelayedTasks() {
if (delayedTaskQueue.isEmpty()) {
return;
}
final ScheduledFutureTask<?>[] delayedTasks =
delayedTaskQueue.toArray(new ScheduledFutureTask<?>[delayedTaskQueue.size()]);
for (ScheduledFutureTask<?> task: delayedTasks) {
task.cancel(false);
}
delayedTaskQueue.clear();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
if (unit == null) {
@ -493,12 +587,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
addTask(task);
wakeup(true);
} else {
synchronized (stateLock) {
if (state == ST_NOT_STARTED) {
state = ST_STARTED;
thread.start();
}
}
startThread();
addTask(task);
if (isTerminated() && removeTask(task)) {
reject();
@ -510,4 +599,267 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
protected static void reject() {
throw new RejectedExecutionException("event executor terminated");
}
// ScheduledExecutorService implementation
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
private static final long START_TIME = System.nanoTime();
private static final AtomicLong nextTaskId = new AtomicLong();
private static long nanoTime() {
return System.nanoTime() - START_TIME;
}
private static long deadlineNanos(long delay) {
return nanoTime() + delay;
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (command == null) {
throw new NullPointerException("command");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (delay < 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: >= 0)", delay));
}
return schedule(new ScheduledFutureTask<Void>(this, command, null, deadlineNanos(unit.toNanos(delay))));
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
if (callable == null) {
throw new NullPointerException("callable");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (delay < 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: >= 0)", delay));
}
return schedule(new ScheduledFutureTask<V>(this, callable, deadlineNanos(unit.toNanos(delay))));
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
if (command == null) {
throw new NullPointerException("command");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (initialDelay < 0) {
throw new IllegalArgumentException(
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
}
if (period <= 0) {
throw new IllegalArgumentException(
String.format("period: %d (expected: > 0)", period));
}
return schedule(new ScheduledFutureTask<Void>(
this, Executors.<Void>callable(command, null),
deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
if (command == null) {
throw new NullPointerException("command");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (initialDelay < 0) {
throw new IllegalArgumentException(
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
}
if (delay <= 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: > 0)", delay));
}
return schedule(new ScheduledFutureTask<Void>(
this, Executors.<Void>callable(command, null),
deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
}
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (task == null) {
throw new NullPointerException("task");
}
if (inEventLoop()) {
delayedTaskQueue.add(task);
} else {
execute(new Runnable() {
@Override
public void run() {
delayedTaskQueue.add(task);
}
});
}
return task;
}
private void startThread() {
synchronized (stateLock) {
if (state == ST_NOT_STARTED) {
state = ST_STARTED;
delayedTaskQueue.add(new ScheduledFutureTask<Void>(
this, Executors.<Void>callable(new PurgeTask(), null),
deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
thread.start();
}
}
}
private static final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<ScheduledFutureTask> uncancellableUpdater =
AtomicIntegerFieldUpdater.newUpdater(ScheduledFutureTask.class, "uncancellable");
private final long id = nextTaskId.getAndIncrement();
private long deadlineNanos;
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
private final long periodNanos;
@SuppressWarnings("UnusedDeclaration")
private volatile int uncancellable;
ScheduledFutureTask(SingleThreadEventExecutor executor, Runnable runnable, V result, long nanoTime) {
this(executor, Executors.callable(runnable, result), nanoTime);
}
ScheduledFutureTask(SingleThreadEventExecutor executor, Callable<V> callable, long nanoTime, long period) {
super(executor, callable);
if (period == 0) {
throw new IllegalArgumentException("period: 0 (expected: != 0)");
}
deadlineNanos = nanoTime;
periodNanos = period;
}
ScheduledFutureTask(SingleThreadEventExecutor executor, Callable<V> callable, long nanoTime) {
super(executor, callable);
deadlineNanos = nanoTime;
periodNanos = 0;
}
@Override
protected SingleThreadEventExecutor executor() {
return (SingleThreadEventExecutor) super.executor();
}
public long deadlineNanos() {
return deadlineNanos;
}
public long delayNanos() {
return Math.max(0, deadlineNanos() - nanoTime());
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(delayNanos(), TimeUnit.NANOSECONDS);
}
@Override
public int compareTo(Delayed o) {
if (this == o) {
return 0;
}
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
long d = deadlineNanos() - that.deadlineNanos();
if (d < 0) {
return -1;
} else if (d > 0) {
return 1;
} else if (id < that.id) {
return -1;
} else if (id == that.id) {
throw new Error();
} else {
return 1;
}
}
@Override
public int hashCode() {
return super.hashCode();
}
@Override
public boolean equals(Object obj) {
return super.equals(obj);
}
@Override
public void run() {
assert executor().inEventLoop();
try {
if (periodNanos == 0) {
if (setUncancellable()) {
V result = task.call();
setSuccessInternal(result);
}
} else {
task.call();
if (!executor().isShutdown()) {
long p = periodNanos;
if (p > 0) {
deadlineNanos += p;
} else {
deadlineNanos = nanoTime() - p;
}
if (!isDone()) {
executor().delayedTaskQueue.add(this);
}
}
}
} catch (Throwable cause) {
setFailureInternal(cause);
}
}
@Override
public boolean isCancelled() {
if (cause() instanceof CancellationException) {
return true;
}
return false;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (!isDone()) {
if (setUncancellable()) {
return tryFailureInternal(new CancellationException());
}
}
return false;
}
private boolean setUncancellable() {
return uncancellableUpdater.compareAndSet(this, 0, 1);
}
}
private final class PurgeTask implements Runnable {
@Override
public void run() {
Iterator<ScheduledFutureTask<?>> i = delayedTaskQueue.iterator();
while (i.hasNext()) {
ScheduledFutureTask<?> task = i.next();
if (task.isCancelled()) {
i.remove();
}
}
}
}
}

View File

@ -29,7 +29,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
public final class TaskScheduler {
@ -329,13 +329,18 @@ public final class TaskScheduler {
private static class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<ScheduledFutureTask> uncancellableUpdater =
AtomicIntegerFieldUpdater.newUpdater(ScheduledFutureTask.class, "uncancellable");
private final long id = nextTaskId.getAndIncrement();
private long deadlineNanos;
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
private final long periodNanos;
private final TaskScheduler scheduler;
private final AtomicBoolean cancellable = new AtomicBoolean(true);
@SuppressWarnings("UnusedDeclaration")
private volatile int uncancellable;
ScheduledFutureTask(TaskScheduler scheduler, EventExecutor executor,
Runnable runnable, V result, long nanoTime) {
@ -394,11 +399,21 @@ public final class TaskScheduler {
}
}
@Override
public int hashCode() {
return super.hashCode();
}
@Override
public boolean equals(Object obj) {
return super.equals(obj);
}
@Override
public void run() {
try {
if (periodNanos == 0) {
if (cancellable.compareAndSet(true, false)) {
if (setUncancellable()) {
V result = task.call();
setSuccessInternal(result);
}
@ -432,12 +447,16 @@ public final class TaskScheduler {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (!isDone()) {
if (cancellable.compareAndSet(true, false)) {
if (setUncancellable()) {
return tryFailureInternal(new CancellationException());
}
}
return false;
}
private boolean setUncancellable() {
return uncancellableUpdater.compareAndSet(this, 0, 1);
}
}
private final class PurgeTask implements Runnable {

View File

@ -15,9 +15,7 @@
*/
package io.netty.channel;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import io.netty.util.concurrent.TaskScheduler;
import java.util.concurrent.ThreadFactory;
@ -27,13 +25,9 @@ import java.util.concurrent.ThreadFactory;
*/
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
/**
*
* @see SingleThreadEventExecutor#SingleThreadEventExecutor(EventExecutorGroup, ThreadFactory, TaskScheduler)
*/
protected SingleThreadEventLoop(
EventLoopGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) {
super(parent, threadFactory, scheduler);
EventLoopGroup parent, ThreadFactory threadFactory) {
super(parent, threadFactory);
}
@Override

View File

@ -26,7 +26,7 @@ public class ThreadPerChannelEventLoop extends SingleThreadEventLoop {
private Channel ch;
public ThreadPerChannelEventLoop(ThreadPerChannelEventLoopGroup parent) {
super(parent, parent.threadFactory, parent.scheduler);
super(parent, parent.threadFactory);
this.parent = parent;
}

View File

@ -16,7 +16,6 @@
package io.netty.channel;
import io.netty.util.concurrent.TaskScheduler;
import io.netty.util.internal.PlatformDependent;
import java.util.Collections;
@ -37,7 +36,6 @@ public class ThreadPerChannelEventLoopGroup implements EventLoopGroup {
private final Object[] childArgs;
private final int maxChannels;
final TaskScheduler scheduler;
final ThreadFactory threadFactory;
final Set<ThreadPerChannelEventLoop> activeChildren =
Collections.newSetFromMap(PlatformDependent.<ThreadPerChannelEventLoop, Boolean>newConcurrentHashMap());
@ -94,8 +92,6 @@ public class ThreadPerChannelEventLoopGroup implements EventLoopGroup {
this.maxChannels = maxChannels;
this.threadFactory = threadFactory;
scheduler = new TaskScheduler(threadFactory);
tooManyChannels = new ChannelException("too many channels (max: " + maxChannels + ')');
tooManyChannels.setStackTrace(STACK_ELEMENTS);
}
@ -115,7 +111,6 @@ public class ThreadPerChannelEventLoopGroup implements EventLoopGroup {
@Override
public void shutdown() {
scheduler.shutdown();
for (EventLoop l: activeChildren) {
l.shutdown();
}
@ -126,9 +121,6 @@ public class ThreadPerChannelEventLoopGroup implements EventLoopGroup {
@Override
public boolean isShutdown() {
if (!scheduler.isShutdown()) {
return false;
}
for (EventLoop l: activeChildren) {
if (!l.isShutdown()) {
return false;
@ -144,9 +136,6 @@ public class ThreadPerChannelEventLoopGroup implements EventLoopGroup {
@Override
public boolean isTerminated() {
if (!scheduler.isTerminated()) {
return false;
}
for (EventLoop l: activeChildren) {
if (!l.isTerminated()) {
return false;
@ -164,15 +153,6 @@ public class ThreadPerChannelEventLoopGroup implements EventLoopGroup {
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long deadline = System.nanoTime() + unit.toNanos(timeout);
for (;;) {
long timeLeft = deadline - System.nanoTime();
if (timeLeft <= 0) {
return isTerminated();
}
if (scheduler.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
break;
}
}
for (EventLoop l: activeChildren) {
for (;;) {
long timeLeft = deadline - System.nanoTime();

View File

@ -19,7 +19,6 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.TaskScheduler;
import io.netty.channel.SingleThreadEventLoop;
import java.util.ArrayList;
@ -59,8 +58,8 @@ final class AioEventLoop extends SingleThreadEventLoop {
}
};
AioEventLoop(AioEventLoopGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) {
super(parent, threadFactory, scheduler);
AioEventLoop(AioEventLoopGroup parent, ThreadFactory threadFactory) {
super(parent, threadFactory);
}
@Override

View File

@ -16,10 +16,9 @@
package io.netty.channel.aio;
import io.netty.channel.Channel;
import io.netty.util.concurrent.TaskScheduler;
import io.netty.util.concurrent.EventExecutor;
import io.netty.channel.EventLoopException;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
import java.io.IOException;
import java.nio.channels.AsynchronousChannelGroup;
@ -107,9 +106,8 @@ public class AioEventLoopGroup extends MultithreadEventLoopGroup {
}
@Override
protected EventExecutor newChild(
ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception {
return new AioEventLoop(this, threadFactory, scheduler);
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {
return new AioEventLoop(this, threadFactory);
}
private static final class AioExecutorService extends AbstractExecutorService {

View File

@ -16,15 +16,13 @@
package io.netty.channel.local;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.concurrent.TaskScheduler;
import java.util.concurrent.ThreadFactory;
final class LocalEventLoop extends SingleThreadEventLoop {
LocalEventLoop(
LocalEventLoopGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) {
super(parent, threadFactory, scheduler);
LocalEventLoop(LocalEventLoopGroup parent, ThreadFactory threadFactory) {
super(parent, threadFactory);
}
@Override

View File

@ -15,9 +15,8 @@
*/
package io.netty.channel.local;
import io.netty.util.concurrent.EventExecutor;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.util.concurrent.TaskScheduler;
import io.netty.util.concurrent.EventExecutor;
import java.util.concurrent.ThreadFactory;
@ -54,7 +53,7 @@ public class LocalEventLoopGroup extends MultithreadEventLoopGroup {
@Override
protected EventExecutor newChild(
ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception {
return new LocalEventLoop(this, threadFactory, scheduler);
ThreadFactory threadFactory, Object... args) throws Exception {
return new LocalEventLoop(this, threadFactory);
}
}

View File

@ -21,7 +21,6 @@ import io.netty.channel.ChannelException;
import io.netty.channel.EventLoopException;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.nio.AbstractNioChannel.NioUnsafe;
import io.netty.util.concurrent.TaskScheduler;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -107,9 +106,8 @@ public final class NioEventLoop extends SingleThreadEventLoop {
private boolean needsToSelectAgain;
NioEventLoop(
NioEventLoopGroup parent, ThreadFactory threadFactory,
TaskScheduler scheduler, SelectorProvider selectorProvider) {
super(parent, threadFactory, scheduler);
NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(parent, threadFactory);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
@ -591,8 +589,13 @@ public final class NioEventLoop extends SingleThreadEventLoop {
}
private int select() throws IOException {
long delayMillis = delayMillis();
try {
return selector.select(SELECT_TIMEOUT);
if (delayMillis > 0) {
return selector.select(delayMillis);
} else {
return selector.selectNow();
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(

View File

@ -18,7 +18,6 @@ package io.netty.channel.nio;
import io.netty.channel.Channel;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.TaskScheduler;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
@ -84,7 +83,7 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup {
@Override
protected EventExecutor newChild(
ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception {
return new NioEventLoop(this, threadFactory, scheduler, (SelectorProvider) args[0]);
ThreadFactory threadFactory, Object... args) throws Exception {
return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}
}

View File

@ -16,13 +16,13 @@
package io.netty.channel;
import io.netty.channel.local.LocalChannel;
import io.netty.util.concurrent.TaskScheduler;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
@ -36,37 +36,51 @@ import static org.junit.Assert.*;
public class SingleThreadEventLoopTest {
private SingleThreadEventLoopImpl loop;
private SingleThreadEventLoopA loopA;
private SingleThreadEventLoopB loopB;
@Before
public void newEventLoop() {
loop = new SingleThreadEventLoopImpl();
loopA = new SingleThreadEventLoopA();
loopB = new SingleThreadEventLoopB();
}
@After
public void stopEventLoop() {
if (!loop.isShutdown()) {
loop.shutdown();
if (!loopA.isShutdown()) {
loopA.shutdown();
}
while (!loop.isTerminated()) {
if (!loopB.isShutdown()) {
loopB.shutdown();
}
while (!loopA.isTerminated()) {
try {
loop.awaitTermination(1, TimeUnit.DAYS);
loopA.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
// Ignore
}
}
assertEquals(1, loopA.cleanedUp.get());
while (!loopB.isTerminated()) {
try {
loopB.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
// Ignore
}
}
assertEquals(1, loop.cleanedUp.get());
}
@Test
public void shutdownBeforeStart() throws Exception {
loop.shutdown();
loopA.shutdown();
}
@Test
public void shutdownAfterStart() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
loop.execute(new Runnable() {
loopA.execute(new Runnable() {
@Override
public void run() {
latch.countDown();
@ -77,21 +91,30 @@ public class SingleThreadEventLoopTest {
latch.await();
// Request the event loop thread to stop.
loop.shutdown();
loopA.shutdown();
assertTrue(loop.isShutdown());
assertTrue(loopA.isShutdown());
// Wait until the event loop is terminated.
while (!loop.isTerminated()) {
loop.awaitTermination(1, TimeUnit.DAYS);
while (!loopA.isTerminated()) {
loopA.awaitTermination(1, TimeUnit.DAYS);
}
}
@Test
public void scheduleTask() throws Exception {
public void scheduleTaskA() throws Exception {
testScheduleTask(loopA);
}
@Test
public void scheduleTaskB() throws Exception {
testScheduleTask(loopB);
}
private static void testScheduleTask(EventLoop loopA) throws InterruptedException, ExecutionException {
long startTime = System.nanoTime();
final AtomicLong endTime = new AtomicLong();
loop.schedule(new Runnable() {
loopA.schedule(new Runnable() {
@Override
public void run() {
endTime.set(System.nanoTime());
@ -101,9 +124,18 @@ public class SingleThreadEventLoopTest {
}
@Test
public void scheduleTaskAtFixedRate() throws Exception {
public void scheduleTaskAtFixedRateA() throws Exception {
testScheduleTaskAtFixedRate(loopA);
}
@Test
public void scheduleTaskAtFixedRateB() throws Exception {
testScheduleTaskAtFixedRate(loopB);
}
private static void testScheduleTaskAtFixedRate(EventLoop loopA) throws InterruptedException {
final Queue<Long> timestamps = new LinkedBlockingQueue<Long>();
ScheduledFuture<?> f = loop.scheduleAtFixedRate(new Runnable() {
ScheduledFuture<?> f = loopA.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
timestamps.add(System.nanoTime());
@ -132,9 +164,18 @@ public class SingleThreadEventLoopTest {
}
@Test
public void scheduleLaggyTaskAtFixedRate() throws Exception {
public void scheduleLaggyTaskAtFixedRateA() throws Exception {
testScheduleLaggyTaskAtFixedRate(loopA);
}
@Test
public void scheduleLaggyTaskAtFixedRateB() throws Exception {
testScheduleLaggyTaskAtFixedRate(loopB);
}
private static void testScheduleLaggyTaskAtFixedRate(EventLoop loopA) throws InterruptedException {
final Queue<Long> timestamps = new LinkedBlockingQueue<Long>();
ScheduledFuture<?> f = loop.scheduleAtFixedRate(new Runnable() {
ScheduledFuture<?> f = loopA.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
boolean empty = timestamps.isEmpty();
@ -173,9 +214,18 @@ public class SingleThreadEventLoopTest {
}
@Test
public void scheduleTaskWithFixedDelay() throws Exception {
public void scheduleTaskWithFixedDelayA() throws Exception {
testScheduleTaskWithFixedDelay(loopA);
}
@Test
public void scheduleTaskWithFixedDelayB() throws Exception {
testScheduleTaskWithFixedDelay(loopB);
}
private static void testScheduleTaskWithFixedDelay(EventLoop loopA) throws InterruptedException {
final Queue<Long> timestamps = new LinkedBlockingQueue<Long>();
ScheduledFuture<?> f = loop.scheduleWithFixedDelay(new Runnable() {
ScheduledFuture<?> f = loopA.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
timestamps.add(System.nanoTime());
@ -223,7 +273,7 @@ public class SingleThreadEventLoopTest {
};
for (int i = 0; i < NUM_TASKS; i ++) {
loop.execute(task);
loopA.execute(task);
}
// At this point, the first task should be running and stuck at latch.await().
@ -233,14 +283,14 @@ public class SingleThreadEventLoopTest {
assertEquals(1, ranTasks.get());
// Shut down the event loop to test if the other tasks are run before termination.
loop.shutdown();
loopA.shutdown();
// Let the other tasks run.
latch.countDown();
// Wait until the event loop is terminated.
while (!loop.isTerminated()) {
loop.awaitTermination(1, TimeUnit.DAYS);
while (!loopA.isTerminated()) {
loopA.awaitTermination(1, TimeUnit.DAYS);
}
// Make sure loop.shutdown() above triggered wakeup().
@ -249,12 +299,12 @@ public class SingleThreadEventLoopTest {
@Test(timeout = 10000)
public void testRegistrationAfterTermination() throws Exception {
loop.shutdown();
while (!loop.isTerminated()) {
loop.awaitTermination(1, TimeUnit.DAYS);
loopA.shutdown();
while (!loopA.isTerminated()) {
loopA.awaitTermination(1, TimeUnit.DAYS);
}
ChannelFuture f = loop.register(new LocalChannel());
ChannelFuture f = loopA.register(new LocalChannel());
f.awaitUninterruptibly();
assertFalse(f.isSuccess());
assertThat(f.cause(), is(instanceOf(RejectedExecutionException.class)));
@ -262,9 +312,9 @@ public class SingleThreadEventLoopTest {
@Test(timeout = 10000)
public void testRegistrationAfterTermination2() throws Exception {
loop.shutdown();
while (!loop.isTerminated()) {
loop.awaitTermination(1, TimeUnit.DAYS);
loopA.shutdown();
while (!loopA.isTerminated()) {
loopA.awaitTermination(1, TimeUnit.DAYS);
}
final CountDownLatch latch = new CountDownLatch(1);
@ -277,7 +327,7 @@ public class SingleThreadEventLoopTest {
}
});
ChannelFuture f = loop.register(ch, promise);
ChannelFuture f = loopA.register(ch, promise);
f.awaitUninterruptibly();
assertFalse(f.isSuccess());
assertThat(f.cause(), is(instanceOf(RejectedExecutionException.class)));
@ -286,13 +336,12 @@ public class SingleThreadEventLoopTest {
assertFalse(latch.await(1, TimeUnit.SECONDS));
}
private static class SingleThreadEventLoopImpl extends SingleThreadEventLoop {
private static class SingleThreadEventLoopA extends SingleThreadEventLoop {
final AtomicInteger cleanedUp = new AtomicInteger();
SingleThreadEventLoopImpl() {
super(null, Executors.defaultThreadFactory(),
new TaskScheduler(Executors.defaultThreadFactory()));
SingleThreadEventLoopA() {
super(null, Executors.defaultThreadFactory());
}
@Override
@ -317,4 +366,33 @@ public class SingleThreadEventLoopTest {
cleanedUp.incrementAndGet();
}
}
private static class SingleThreadEventLoopB extends SingleThreadEventLoop {
SingleThreadEventLoopB() {
super(null, Executors.defaultThreadFactory());
}
@Override
protected void run() {
for (;;) {
try {
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(delayNanos()));
} catch (InterruptedException e) {
// Waken up by interruptThread()
}
runAllTasks();
if (isShutdown() && confirmShutdown()) {
break;
}
}
}
@Override
protected void wakeup(boolean inEventLoop) {
interruptThread();
}
}
}