From 421eabe666f5eb3c760c757e991e4f37213f7333 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Sat, 18 Aug 2012 18:40:21 +0900 Subject: [PATCH] [#473] Fix elevated context switching in SingleThreadEventExecutor - Remove polling in SingleThreadEventExecutor - Create a dedicated scheduled task scheduler called 'TaskScheduler' - TaskScheduler is created per EventLoopGroup / EventExecutorGroup - SingleThreadEventExecutor delegates all scheduled execution requests to TaskScheduler provided as a constructor parameter - TaskScheduler is a specialized form of single threaded ScheduledExecutorService which requires an EventExecutor as a parameter for all requests. --- .../netty/channel/DefaultEventExecutor.java | 5 +- .../channel/DefaultEventExecutorGroup.java | 5 +- .../MultithreadEventExecutorGroup.java | 24 +- .../channel/SingleThreadEventExecutor.java | 298 +----------- .../netty/channel/SingleThreadEventLoop.java | 5 +- .../java/io/netty/channel/TaskScheduler.java | 426 ++++++++++++++++++ .../netty/channel/local/LocalEventLoop.java | 6 +- .../channel/local/LocalEventLoopGroup.java | 6 +- .../socket/aio/AbstractAioChannel.java | 4 +- ...oChildEventLoop.java => AioEventLoop.java} | 7 +- .../channel/socket/aio/AioEventLoopGroup.java | 6 +- .../channel/socket/nio/NioEventLoop.java | 7 +- .../channel/socket/nio/NioEventLoopGroup.java | 9 +- .../channel/socket/oio/OioEventLoop.java | 2 +- .../channel/socket/oio/OioEventLoopGroup.java | 20 + .../channel/SingleThreadEventLoopTest.java | 3 +- 16 files changed, 524 insertions(+), 309 deletions(-) create mode 100644 transport/src/main/java/io/netty/channel/TaskScheduler.java rename transport/src/main/java/io/netty/channel/socket/aio/{AioChildEventLoop.java => AioEventLoop.java} (85%) diff --git a/transport/src/main/java/io/netty/channel/DefaultEventExecutor.java b/transport/src/main/java/io/netty/channel/DefaultEventExecutor.java index 33c6993e00..c783d3d000 100644 --- a/transport/src/main/java/io/netty/channel/DefaultEventExecutor.java +++ b/transport/src/main/java/io/netty/channel/DefaultEventExecutor.java @@ -19,8 +19,9 @@ import java.util.concurrent.ThreadFactory; class DefaultEventExecutor extends SingleThreadEventExecutor { - DefaultEventExecutor(DefaultEventExecutorGroup parent, ThreadFactory threadFactory) { - super(parent, threadFactory); + DefaultEventExecutor( + DefaultEventExecutorGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) { + super(parent, threadFactory, scheduler); } @Override diff --git a/transport/src/main/java/io/netty/channel/DefaultEventExecutorGroup.java b/transport/src/main/java/io/netty/channel/DefaultEventExecutorGroup.java index 424c095a41..012b39d132 100644 --- a/transport/src/main/java/io/netty/channel/DefaultEventExecutorGroup.java +++ b/transport/src/main/java/io/netty/channel/DefaultEventExecutorGroup.java @@ -28,7 +28,8 @@ public class DefaultEventExecutorGroup extends MultithreadEventExecutorGroup { } @Override - protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { - return new DefaultEventExecutor(this, threadFactory); + protected EventExecutor newChild( + ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception { + return new DefaultEventExecutor(this, threadFactory, scheduler); } } diff --git a/transport/src/main/java/io/netty/channel/MultithreadEventExecutorGroup.java b/transport/src/main/java/io/netty/channel/MultithreadEventExecutorGroup.java index 8622a1e177..8601cfaea7 100644 --- a/transport/src/main/java/io/netty/channel/MultithreadEventExecutorGroup.java +++ b/transport/src/main/java/io/netty/channel/MultithreadEventExecutorGroup.java @@ -24,6 +24,7 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou private static final int DEFAULT_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2; private static final AtomicInteger poolId = new AtomicInteger(); + final TaskScheduler scheduler; private final EventExecutor[] children; private final AtomicInteger childIndex = new AtomicInteger(); @@ -40,11 +41,13 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou threadFactory = new DefaultThreadFactory(); } + scheduler = new TaskScheduler(threadFactory); + children = new SingleThreadEventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { - children[i] = newChild(threadFactory, args); + children[i] = newChild(threadFactory, scheduler, args); success = true; } catch (Exception e) { throw new EventLoopException("failed to create a child event loop", e); @@ -63,10 +66,12 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou return children[Math.abs(childIndex.getAndIncrement() % children.length)]; } - protected abstract EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception; + protected abstract EventExecutor newChild( + ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception; @Override public void shutdown() { + scheduler.shutdown(); for (EventExecutor l: children) { l.shutdown(); } @@ -74,6 +79,9 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou @Override public boolean isShutdown() { + if (!scheduler.isShutdown()) { + return false; + } for (EventExecutor l: children) { if (!l.isShutdown()) { return false; @@ -84,6 +92,9 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou @Override public boolean isTerminated() { + if (!scheduler.isTerminated()) { + return false; + } for (EventExecutor l: children) { if (!l.isTerminated()) { return false; @@ -96,6 +107,15 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long deadline = System.nanoTime() + unit.toNanos(timeout); + for (;;) { + long timeLeft = deadline - System.nanoTime(); + if (timeLeft <= 0) { + return isTerminated(); + } + if (scheduler.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) { + break; + } + } loop: for (EventExecutor l: children) { for (;;) { long timeLeft = deadline - System.nanoTime(); diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java b/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java index f1321362bb..3570d46617 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java @@ -20,35 +20,24 @@ import io.netty.logging.InternalLoggerFactory; import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; -import java.util.Queue; import java.util.Set; import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; -import java.util.concurrent.DelayQueue; -import java.util.concurrent.Delayed; -import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; public abstract class SingleThreadEventExecutor extends AbstractExecutorService implements EventExecutor { private static final InternalLogger logger = InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class); - private static final long SCHEDULE_CHECK_INTERVAL = TimeUnit.MILLISECONDS.toNanos(10); - private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1); - private static final long START_TIME = System.nanoTime(); - private static final AtomicLong nextTaskId = new AtomicLong(); - static final ThreadLocal CURRENT_EVENT_LOOP = new ThreadLocal(); @@ -56,33 +45,27 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService return CURRENT_EVENT_LOOP.get(); } - private static long nanoTime() { - return System.nanoTime() - START_TIME; - } - - private static long deadlineNanos(long delay) { - return nanoTime() + delay; - } - private final EventExecutorGroup parent; private final BlockingQueue taskQueue = new LinkedBlockingQueue(); private final Thread thread; private final Object stateLock = new Object(); private final Semaphore threadLock = new Semaphore(0); - // TODO: Use PriorityQueue to reduce the locking overhead of DelayQueue. - private final Queue> scheduledTasks = new DelayQueue>(); + private final TaskScheduler scheduler; private final Set shutdownHooks = new LinkedHashSet(); /** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */ private volatile int state; - private long lastCheckTimeNanos; - private long lastPurgeTimeNanos; - protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory) { + protected SingleThreadEventExecutor( + EventExecutorGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } + if (scheduler == null) { + throw new NullPointerException("scheduler"); + } this.parent = parent; + this.scheduler = scheduler; thread = threadFactory.newThread(new Runnable() { @Override @@ -115,7 +98,6 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService private void cleanupTasks() { for (;;) { boolean ran = false; - cancelScheduledTasks(); ran |= runAllTasks(); ran |= runShutdownHooks(); if (!ran && !hasTasks()) { @@ -142,65 +124,22 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService protected Runnable pollTask() { assert inEventLoop(); - - Runnable task = taskQueue.poll(); - if (task != null) { - return task; - } - - if (fetchScheduledTasks()) { - task = taskQueue.poll(); - return task; - } - - return null; + return taskQueue.poll(); } protected Runnable takeTask() throws InterruptedException { assert inEventLoop(); - - for (;;) { - Runnable task = taskQueue.poll(SCHEDULE_CHECK_INTERVAL * 2 / 3, TimeUnit.NANOSECONDS); - if (task != null) { - return task; - } - fetchScheduledTasks(); - task = taskQueue.poll(); - if (task != null) { - return task; - } - } + return taskQueue.take(); } protected Runnable peekTask() { assert inEventLoop(); - - Runnable task = taskQueue.peek(); - if (task != null) { - return task; - } - - if (fetchScheduledTasks()) { - task = taskQueue.peek(); - return task; - } - - return null; + return taskQueue.peek(); } protected boolean hasTasks() { assert inEventLoop(); - - boolean empty = taskQueue.isEmpty(); - if (!empty) { - return true; - } - - if (fetchScheduledTasks()) { - return !taskQueue.isEmpty(); - } - - return false; + return !taskQueue.isEmpty(); } protected void addTask(Runnable task) { @@ -397,228 +336,21 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService @Override public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - if (command == null) { - throw new NullPointerException("command"); - } - if (unit == null) { - throw new NullPointerException("unit"); - } - if (delay < 0) { - throw new IllegalArgumentException( - String.format("delay: %d (expected: >= 0)", delay)); - } - return schedule(new ScheduledFutureTask(command, null, deadlineNanos(unit.toNanos(delay)))); + return scheduler.schedule(this, command, delay, unit); } @Override public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - if (callable == null) { - throw new NullPointerException("callable"); - } - if (unit == null) { - throw new NullPointerException("unit"); - } - if (delay < 0) { - throw new IllegalArgumentException( - String.format("delay: %d (expected: >= 0)", delay)); - } - return schedule(new ScheduledFutureTask(callable, deadlineNanos(unit.toNanos(delay)))); + return scheduler.schedule(this, callable, delay, unit); } @Override public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - if (command == null) { - throw new NullPointerException("command"); - } - if (unit == null) { - throw new NullPointerException("unit"); - } - if (initialDelay < 0) { - throw new IllegalArgumentException( - String.format("initialDelay: %d (expected: >= 0)", initialDelay)); - } - if (period <= 0) { - throw new IllegalArgumentException( - String.format("period: %d (expected: > 0)", period)); - } - - return schedule(new ScheduledFutureTask( - command, null, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period))); + return scheduler.scheduleAtFixedRate(this, command, initialDelay, period, unit); } @Override public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - if (command == null) { - throw new NullPointerException("command"); - } - if (unit == null) { - throw new NullPointerException("unit"); - } - if (initialDelay < 0) { - throw new IllegalArgumentException( - String.format("initialDelay: %d (expected: >= 0)", initialDelay)); - } - if (delay <= 0) { - throw new IllegalArgumentException( - String.format("delay: %d (expected: > 0)", delay)); - } - - return schedule(new ScheduledFutureTask( - command, null, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay))); - } - - private ScheduledFuture schedule(ScheduledFutureTask task) { - if (isShutdown()) { - reject(); - } - scheduledTasks.add(task); - if (isShutdown()) { - task.cancel(false); - } - - if (!inEventLoop()) { - synchronized (stateLock) { - if (state == 0) { - state = 1; - thread.start(); - } - } - } else { - fetchScheduledTasks(); - } - - return task; - } - - private boolean fetchScheduledTasks() { - if (scheduledTasks.isEmpty()) { - return false; - } - - long nanoTime = nanoTime(); - if (nanoTime - lastPurgeTimeNanos >= SCHEDULE_PURGE_INTERVAL) { - for (Iterator> i = scheduledTasks.iterator(); i.hasNext();) { - ScheduledFutureTask task = i.next(); - if (task.isCancelled()) { - i.remove(); - } - } - } - - if (nanoTime - lastCheckTimeNanos >= SCHEDULE_CHECK_INTERVAL) { - boolean added = false; - for (;;) { - ScheduledFutureTask task = scheduledTasks.poll(); - if (task == null) { - break; - } - - if (!task.isCancelled()) { - if (isShutdown()) { - task.cancel(false); - } else { - taskQueue.add(task); - added = true; - } - } - } - return added; - } - - return false; - } - - private void cancelScheduledTasks() { - if (scheduledTasks.isEmpty()) { - return; - } - - for (ScheduledFutureTask task: scheduledTasks.toArray(new ScheduledFutureTask[scheduledTasks.size()])) { - task.cancel(false); - } - scheduledTasks.clear(); - } - - private class ScheduledFutureTask extends FutureTask implements ScheduledFuture { - - private final long id = nextTaskId.getAndIncrement(); - private long deadlineNanos; - /* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */ - private final long periodNanos; - - ScheduledFutureTask(Runnable runnable, V result, long nanoTime) { - super(runnable, result); - deadlineNanos = nanoTime; - periodNanos = 0; - } - - ScheduledFutureTask(Runnable runnable, V result, long nanoTime, long period) { - super(runnable, result); - if (period == 0) { - throw new IllegalArgumentException( - String.format("period: %d (expected: != 0)", period)); - } - deadlineNanos = nanoTime; - periodNanos = period; - } - - ScheduledFutureTask(Callable callable, long nanoTime) { - super(callable); - deadlineNanos = nanoTime; - periodNanos = 0; - } - - public long deadlineNanos() { - return deadlineNanos; - } - - public long delayNanos() { - return Math.max(0, deadlineNanos() - nanoTime()); - } - - @Override - public long getDelay(TimeUnit unit) { - return unit.convert(delayNanos(), TimeUnit.NANOSECONDS); - } - - @Override - public int compareTo(Delayed o) { - if (this == o) { - return 0; - } - - ScheduledFutureTask that = (ScheduledFutureTask) o; - long d = deadlineNanos() - that.deadlineNanos(); - if (d < 0) { - return -1; - } else if (d > 0) { - return 1; - } else if (id < that.id) { - return -1; - } else if (id == that.id) { - throw new Error(); - } else { - return 1; - } - } - - @Override - public void run() { - if (periodNanos == 0) { - super.run(); - } else { - boolean reset = runAndReset(); - if (reset && !isShutdown()) { - long p = periodNanos; - if (p > 0) { - deadlineNanos += p; - } else { - deadlineNanos = nanoTime() - p; - } - - schedule(this); - } - } - } + return scheduler.scheduleWithFixedDelay(this, command, initialDelay, delay, unit); } } diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index c4d4c6b9fd..05d90c1cdd 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -19,8 +19,9 @@ import java.util.concurrent.ThreadFactory; public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { - protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory) { - super(parent, threadFactory); + protected SingleThreadEventLoop( + EventLoopGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) { + super(parent, threadFactory, scheduler); } @Override diff --git a/transport/src/main/java/io/netty/channel/TaskScheduler.java b/transport/src/main/java/io/netty/channel/TaskScheduler.java new file mode 100644 index 0000000000..b525815a8d --- /dev/null +++ b/transport/src/main/java/io/netty/channel/TaskScheduler.java @@ -0,0 +1,426 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + +import io.netty.logging.InternalLogger; +import io.netty.logging.InternalLoggerFactory; + +import java.util.Iterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +public final class TaskScheduler { + + private static final InternalLogger logger = + InternalLoggerFactory.getInstance(TaskScheduler.class); + + private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1); + private static final long START_TIME = System.nanoTime(); + private static final AtomicLong nextTaskId = new AtomicLong(); + + private static long nanoTime() { + return System.nanoTime() - START_TIME; + } + + private static long deadlineNanos(long delay) { + return nanoTime() + delay; + } + + private final BlockingQueue> taskQueue = new DelayQueue>(); + private final Thread thread; + private final Object stateLock = new Object(); + private final Semaphore threadLock = new Semaphore(0); + /** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */ + private volatile int state; + + public TaskScheduler(ThreadFactory threadFactory) { + if (threadFactory == null) { + throw new NullPointerException("threadFactory"); + } + + thread = threadFactory.newThread(new Runnable() { + @Override + public void run() { + try { + for (;;) { + ScheduledFutureTask task; + try { + task = taskQueue.take(); + runTask(task); + } catch (InterruptedException e) { + // Waken up by interruptThread() + } + + if (isShutdown() && taskQueue.peek() == null) { + break; + } + } + } finally { + try { + // Run all remaining tasks and shutdown hooks. + try { + cleanupTasks(); + } finally { + synchronized (stateLock) { + state = 3; + } + } + cleanupTasks(); + } finally { + threadLock.release(); + assert taskQueue.isEmpty(); + } + } + } + + private void runTask(ScheduledFutureTask task) { + EventExecutor executor = task.executor; + if (executor == null) { + task.run(); + } else { + if (executor.isShutdown()) { + task.cancel(false); + } else { + try { + task.executor.execute(task); + } catch (RejectedExecutionException e) { + task.cancel(false); + } + } + } + } + + private void cleanupTasks() { + for (;;) { + boolean ran = false; + cancelScheduledTasks(); + for (;;) { + final ScheduledFutureTask task = taskQueue.poll(); + if (task == null) { + break; + } + + try { + runTask(task); + ran = true; + } catch (Throwable t) { + logger.warn("A task raised an exception.", t); + } + } + + if (!ran && taskQueue.isEmpty()) { + break; + } + } + } + }); + } + + private boolean inSameThread() { + return Thread.currentThread() == thread; + } + + public void shutdown() { + boolean inSameThread = inSameThread(); + boolean wakeup = false; + if (inSameThread) { + synchronized (stateLock) { + assert state == 1; + state = 2; + wakeup = true; + } + } else { + synchronized (stateLock) { + switch (state) { + case 0: + state = 3; + threadLock.release(); + break; + case 1: + state = 2; + wakeup = true; + break; + } + } + } + + if (wakeup && !inSameThread && isShutdown()) { + thread.interrupt(); + } + } + + public boolean isShutdown() { + return state >= 2; + } + + public boolean isTerminated() { + return state == 3; + } + + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + if (unit == null) { + throw new NullPointerException("unit"); + } + + if (inSameThread()) { + throw new IllegalStateException("cannot await termination of the current thread"); + } + + if (threadLock.tryAcquire(timeout, unit)) { + threadLock.release(); + } + + return isTerminated(); + } + + public ScheduledFuture schedule( + EventExecutor executor, Runnable command, long delay, TimeUnit unit) { + if (executor == null) { + throw new NullPointerException("executor"); + } + if (command == null) { + throw new NullPointerException("command"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + if (delay < 0) { + throw new IllegalArgumentException( + String.format("delay: %d (expected: >= 0)", delay)); + } + return schedule(new ScheduledFutureTask(executor, command, null, deadlineNanos(unit.toNanos(delay)))); + } + + public ScheduledFuture schedule( + EventExecutor executor, Callable callable, long delay, TimeUnit unit) { + if (executor == null) { + throw new NullPointerException("executor"); + } + if (callable == null) { + throw new NullPointerException("callable"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + if (delay < 0) { + throw new IllegalArgumentException( + String.format("delay: %d (expected: >= 0)", delay)); + } + return schedule(new ScheduledFutureTask(executor, callable, deadlineNanos(unit.toNanos(delay)))); + } + + public ScheduledFuture scheduleAtFixedRate( + EventExecutor executor, Runnable command, long initialDelay, long period, TimeUnit unit) { + if (executor == null) { + throw new NullPointerException("executor"); + } + if (command == null) { + throw new NullPointerException("command"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + if (initialDelay < 0) { + throw new IllegalArgumentException( + String.format("initialDelay: %d (expected: >= 0)", initialDelay)); + } + if (period <= 0) { + throw new IllegalArgumentException( + String.format("period: %d (expected: > 0)", period)); + } + + return schedule(new ScheduledFutureTask( + executor, command, null, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period))); + } + + public ScheduledFuture scheduleWithFixedDelay( + EventExecutor executor, Runnable command, long initialDelay, long delay, TimeUnit unit) { + if (executor == null) { + throw new NullPointerException("executor"); + } + if (command == null) { + throw new NullPointerException("command"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + if (initialDelay < 0) { + throw new IllegalArgumentException( + String.format("initialDelay: %d (expected: >= 0)", initialDelay)); + } + if (delay <= 0) { + throw new IllegalArgumentException( + String.format("delay: %d (expected: > 0)", delay)); + } + + return schedule(new ScheduledFutureTask( + executor, command, null, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay))); + } + + private ScheduledFuture schedule(ScheduledFutureTask task) { + if (isShutdown()) { + reject(); + } + taskQueue.add(task); + if (isShutdown()) { + task.cancel(false); + } + + boolean started = false; + if (!inSameThread()) { + synchronized (stateLock) { + if (state == 0) { + state = 1; + thread.start(); + started = true; + } + } + } + + if (started) { + schedule(new ScheduledFutureTask( + null, new PurgeTask(), null, + deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL)); + } + + return task; + } + + private static void reject() { + throw new RejectedExecutionException("event executor shut down"); + } + + private void cancelScheduledTasks() { + if (taskQueue.isEmpty()) { + return; + } + + for (ScheduledFutureTask task: taskQueue.toArray(new ScheduledFutureTask[taskQueue.size()])) { + task.cancel(false); + } + + taskQueue.clear(); + } + + private class ScheduledFutureTask extends FutureTask implements ScheduledFuture { + + private final EventExecutor executor; + private final long id = nextTaskId.getAndIncrement(); + private long deadlineNanos; + /* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */ + private final long periodNanos; + + ScheduledFutureTask(EventExecutor executor, Runnable runnable, V result, long nanoTime) { + super(runnable, result); + this.executor = executor; + deadlineNanos = nanoTime; + periodNanos = 0; + } + + ScheduledFutureTask(EventExecutor executor, Runnable runnable, V result, long nanoTime, long period) { + super(runnable, result); + if (period == 0) { + throw new IllegalArgumentException( + String.format("period: %d (expected: != 0)", period)); + } + this.executor = executor; + deadlineNanos = nanoTime; + periodNanos = period; + } + + ScheduledFutureTask(EventExecutor executor, Callable callable, long nanoTime) { + super(callable); + this.executor = executor; + deadlineNanos = nanoTime; + periodNanos = 0; + } + + public long deadlineNanos() { + return deadlineNanos; + } + + public long delayNanos() { + return Math.max(0, deadlineNanos() - nanoTime()); + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(delayNanos(), TimeUnit.NANOSECONDS); + } + + @Override + public int compareTo(Delayed o) { + if (this == o) { + return 0; + } + + ScheduledFutureTask that = (ScheduledFutureTask) o; + long d = deadlineNanos() - that.deadlineNanos(); + if (d < 0) { + return -1; + } else if (d > 0) { + return 1; + } else if (id < that.id) { + return -1; + } else if (id == that.id) { + throw new Error(); + } else { + return 1; + } + } + + @Override + public void run() { + if (periodNanos == 0) { + super.run(); + } else { + boolean reset = runAndReset(); + if (reset && !isShutdown()) { + long p = periodNanos; + if (p > 0) { + deadlineNanos += p; + } else { + deadlineNanos = nanoTime() - p; + } + + schedule(this); + } + } + } + } + + private final class PurgeTask implements Runnable { + @Override + public void run() { + Iterator> i = taskQueue.iterator(); + while (i.hasNext()) { + ScheduledFutureTask task = i.next(); + if (task.isCancelled()) { + i.remove(); + } + } + } + } +} diff --git a/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java b/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java index 10047a86d5..fc304a3756 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java +++ b/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java @@ -16,13 +16,15 @@ package io.netty.channel.local; import io.netty.channel.SingleThreadEventLoop; +import io.netty.channel.TaskScheduler; import java.util.concurrent.ThreadFactory; final class LocalEventLoop extends SingleThreadEventLoop { - LocalEventLoop(LocalEventLoopGroup parent, ThreadFactory threadFactory) { - super(parent, threadFactory); + LocalEventLoop( + LocalEventLoopGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) { + super(parent, threadFactory, scheduler); } @Override diff --git a/transport/src/main/java/io/netty/channel/local/LocalEventLoopGroup.java b/transport/src/main/java/io/netty/channel/local/LocalEventLoopGroup.java index 3d6dba41eb..bf1a00e032 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/local/LocalEventLoopGroup.java @@ -17,6 +17,7 @@ package io.netty.channel.local; import io.netty.channel.EventExecutor; import io.netty.channel.MultithreadEventLoopGroup; +import io.netty.channel.TaskScheduler; import java.util.concurrent.ThreadFactory; @@ -35,7 +36,8 @@ public class LocalEventLoopGroup extends MultithreadEventLoopGroup { } @Override - protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { - return new LocalEventLoop(this, threadFactory); + protected EventExecutor newChild( + ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception { + return new LocalEventLoop(this, threadFactory, scheduler); } } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java index bbb000b15f..ac31001d50 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java @@ -68,7 +68,7 @@ abstract class AbstractAioChannel extends AbstractChannel { @Override protected Runnable doRegister() throws Exception { - if (((AioChildEventLoop) eventLoop()).parent() != group) { + if (((AioEventLoop) eventLoop()).parent() != group) { throw new ChannelException( getClass().getSimpleName() + " must be registered to the " + AioEventLoopGroup.class.getSimpleName() + " which was specified in the constructor."); @@ -83,7 +83,7 @@ abstract class AbstractAioChannel extends AbstractChannel { @Override protected boolean isCompatible(EventLoop loop) { - return loop instanceof AioChildEventLoop; + return loop instanceof AioEventLoop; } protected abstract class AbstractAioUnsafe extends AbstractUnsafe { diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioChildEventLoop.java b/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoop.java similarity index 85% rename from transport/src/main/java/io/netty/channel/socket/aio/AioChildEventLoop.java rename to transport/src/main/java/io/netty/channel/socket/aio/AioEventLoop.java index cae4ea1469..8578eb5c84 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioChildEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoop.java @@ -16,13 +16,14 @@ package io.netty.channel.socket.aio; import io.netty.channel.SingleThreadEventLoop; +import io.netty.channel.TaskScheduler; import java.util.concurrent.ThreadFactory; -final class AioChildEventLoop extends SingleThreadEventLoop { +final class AioEventLoop extends SingleThreadEventLoop { - AioChildEventLoop(AioEventLoopGroup parent, ThreadFactory threadFactory) { - super(parent, threadFactory); + AioEventLoop(AioEventLoopGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) { + super(parent, threadFactory, scheduler); } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoopGroup.java b/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoopGroup.java index 2abaa66a8b..72f5caa794 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoopGroup.java @@ -18,6 +18,7 @@ package io.netty.channel.socket.aio; import io.netty.channel.EventExecutor; import io.netty.channel.EventLoopException; import io.netty.channel.MultithreadEventLoopGroup; +import io.netty.channel.TaskScheduler; import java.io.IOException; import java.lang.reflect.Field; @@ -57,8 +58,9 @@ public class AioEventLoopGroup extends MultithreadEventLoopGroup { } @Override - protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { - return new AioChildEventLoop(this, threadFactory); + protected EventExecutor newChild( + ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception { + return new AioEventLoop(this, threadFactory, scheduler); } private void executeAioTask(Runnable command) { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoop.java b/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoop.java index 4775a2d91b..a2a5f99153 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoop.java @@ -18,6 +18,7 @@ package io.netty.channel.socket.nio; import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.SingleThreadEventLoop; +import io.netty.channel.TaskScheduler; import io.netty.channel.socket.nio.AbstractNioChannel.NioUnsafe; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; @@ -60,8 +61,10 @@ final class NioEventLoop extends SingleThreadEventLoop { private int cancelledKeys; private boolean cleanedCancelledKeys; - NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) { - super(parent, threadFactory); + NioEventLoop( + NioEventLoopGroup parent, ThreadFactory threadFactory, + TaskScheduler scheduler, SelectorProvider selectorProvider) { + super(parent, threadFactory, scheduler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoopGroup.java b/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoopGroup.java index 48f6c98fbc..68a1b38bcf 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoopGroup.java @@ -17,6 +17,7 @@ package io.netty.channel.socket.nio; import io.netty.channel.EventExecutor; import io.netty.channel.MultithreadEventLoopGroup; +import io.netty.channel.TaskScheduler; import java.nio.channels.spi.SelectorProvider; import java.util.concurrent.ThreadFactory; @@ -35,18 +36,20 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup { super(nThreads, threadFactory); } - public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) { + public NioEventLoopGroup( + int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) { super(nThreads, threadFactory, selectorProvider); } @Override - protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { + protected EventExecutor newChild( + ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception { SelectorProvider selectorProvider; if (args == null || args.length == 0 || args[0] == null) { selectorProvider = SelectorProvider.provider(); } else { selectorProvider = (SelectorProvider) args[0]; } - return new NioEventLoop(this, threadFactory, selectorProvider); + return new NioEventLoop(this, threadFactory, scheduler, selectorProvider); } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java b/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java index 7049f7080f..5492769af9 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java @@ -27,7 +27,7 @@ class OioEventLoop extends SingleThreadEventLoop { private AbstractOioChannel ch; OioEventLoop(OioEventLoopGroup parent) { - super(parent, parent.threadFactory); + super(parent, parent.threadFactory, parent.scheduler); this.parent = parent; } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoopGroup.java b/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoopGroup.java index 76e473faf2..d0b37deacc 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoopGroup.java @@ -21,6 +21,7 @@ import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; +import io.netty.channel.TaskScheduler; import java.util.Collections; import java.util.Queue; @@ -34,6 +35,7 @@ import java.util.concurrent.TimeUnit; public class OioEventLoopGroup implements EventLoopGroup { private final int maxChannels; + final TaskScheduler scheduler; final ThreadFactory threadFactory; final Set activeChildren = Collections.newSetFromMap( new ConcurrentHashMap()); @@ -60,6 +62,8 @@ public class OioEventLoopGroup implements EventLoopGroup { this.maxChannels = maxChannels; this.threadFactory = threadFactory; + scheduler = new TaskScheduler(threadFactory); + tooManyChannels = new ChannelException("too many channels (max: " + maxChannels + ')'); tooManyChannels.setStackTrace(new StackTraceElement[0]); } @@ -71,6 +75,7 @@ public class OioEventLoopGroup implements EventLoopGroup { @Override public void shutdown() { + scheduler.shutdown(); for (EventLoop l: activeChildren) { l.shutdown(); } @@ -81,6 +86,9 @@ public class OioEventLoopGroup implements EventLoopGroup { @Override public boolean isShutdown() { + if (!scheduler.isShutdown()) { + return false; + } for (EventLoop l: activeChildren) { if (!l.isShutdown()) { return false; @@ -96,6 +104,9 @@ public class OioEventLoopGroup implements EventLoopGroup { @Override public boolean isTerminated() { + if (!scheduler.isTerminated()) { + return false; + } for (EventLoop l: activeChildren) { if (!l.isTerminated()) { return false; @@ -113,6 +124,15 @@ public class OioEventLoopGroup implements EventLoopGroup { public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long deadline = System.nanoTime() + unit.toNanos(timeout); + for (;;) { + long timeLeft = deadline - System.nanoTime(); + if (timeLeft <= 0) { + return isTerminated(); + } + if (scheduler.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) { + break; + } + } for (EventLoop l: activeChildren) { for (;;) { long timeLeft = deadline - System.nanoTime(); diff --git a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java index bb9eaa0481..951b28515a 100644 --- a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java @@ -256,7 +256,8 @@ public class SingleThreadEventLoopTest { final AtomicInteger cleanedUp = new AtomicInteger(); SingleThreadEventLoopImpl() { - super(null, Executors.defaultThreadFactory()); + super(null, Executors.defaultThreadFactory(), + new TaskScheduler(Executors.defaultThreadFactory())); } @Override