From 17492109850e5cd1ba22cacb3e02fc6f705af073 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Wed, 12 Jun 2013 06:40:01 +0900 Subject: [PATCH] Add GlobalEventExecutor - Related issue: #1389 - Also extracted SingleThreadEventExecutor.ScheduledFutureTask into a top level class for a reuse --- .../concurrent/AbstractEventExecutor.java | 5 + .../util/concurrent/GlobalEventExecutor.java | 356 ++++++++++++++++++ .../util/concurrent/ScheduledFutureTask.java | 148 ++++++++ .../concurrent/SingleThreadEventExecutor.java | 154 +------- .../concurrent/GlobalEventExecutorTest.java | 112 ++++++ 5 files changed, 640 insertions(+), 135 deletions(-) create mode 100644 common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java create mode 100644 common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java create mode 100644 common/src/test/java/io/netty/util/concurrent/GlobalEventExecutorTest.java diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java index 55ea87aedf..990e001b32 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java @@ -34,6 +34,11 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl return this; } + @Override + public boolean inEventLoop() { + return inEventLoop(Thread.currentThread()); + } + @Override public Iterator iterator() { return new EventExecutorIterator(); diff --git a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java new file mode 100644 index 0000000000..58abc7a8eb --- /dev/null +++ b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java @@ -0,0 +1,356 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.concurrent; + +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +import java.util.Iterator; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** + * Single-thread singleton {@link EventExecutor}. It starts the thread automatically and stops it when there is no + * task pending in the task queue for 1 second. Please note it is not scalable to schedule large number of tasks to + * this executor; use a dedicated executor. + */ +public final class GlobalEventExecutor extends AbstractEventExecutor { + + private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class); + + private static final int ST_NOT_STARTED = 1; + private static final int ST_STARTED = 2; + + private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1); + + public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor(); + + final Queue taskQueue = new LinkedBlockingQueue(); + final Queue> delayedTaskQueue = new PriorityQueue>(); + final ScheduledFutureTask purgeTask = new ScheduledFutureTask( + this, delayedTaskQueue, Executors.callable(new PurgeTask(), null), + ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL); + + private final ThreadFactory threadFactory = new DefaultThreadFactory(getClass()); + private final TaskRunner taskRunner = new TaskRunner(); + private final Object stateLock = new Object(); + + volatile Thread thread; + private volatile int state = ST_NOT_STARTED; + + private GlobalEventExecutor() { + delayedTaskQueue.add(purgeTask); + } + + @Override + public EventExecutorGroup parent() { + return null; + } + + /** + * Take the next {@link Runnable} from the task queue and so will block if no task is currently present. + * + * @return {@code null} if the executor thread has been interrupted or waken up. + */ + Runnable takeTask() { + BlockingQueue taskQueue = (BlockingQueue) this.taskQueue; + for (;;) { + ScheduledFutureTask delayedTask = delayedTaskQueue.peek(); + if (delayedTask == null) { + Runnable task = null; + try { + task = taskQueue.take(); + } catch (InterruptedException e) { + // Ignore + } + return task; + } else { + long delayNanos = delayedTask.delayNanos(); + Runnable task; + if (delayNanos > 0) { + try { + task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS); + } catch (InterruptedException e) { + return null; + } + } else { + task = taskQueue.poll(); + } + + if (task == null) { + fetchFromDelayedQueue(); + task = taskQueue.poll(); + } + + if (task != null) { + return task; + } + } + } + } + + private void fetchFromDelayedQueue() { + long nanoTime = 0L; + for (;;) { + ScheduledFutureTask delayedTask = delayedTaskQueue.peek(); + if (delayedTask == null) { + break; + } + + if (nanoTime == 0L) { + nanoTime = ScheduledFutureTask.nanoTime(); + } + + if (delayedTask.deadlineNanos() <= nanoTime) { + delayedTaskQueue.remove(); + taskQueue.add(delayedTask); + } else { + break; + } + } + } + + /** + * Return the number of tasks that are pending for processing. + * + * Be aware that this operation may be expensive as it depends on the internal implementation of the + * SingleThreadEventExecutor. So use it was care! + */ + public int pendingTasks() { + return taskQueue.size(); + } + + /** + * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown + * before. + */ + private void addTask(Runnable task) { + if (task == null) { + throw new NullPointerException("task"); + } + taskQueue.add(task); + } + + @Override + public boolean inEventLoop(Thread thread) { + return thread == this.thread; + } + + @Override + public void shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + @Override + @Deprecated + public void shutdown() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isShuttingDown() { + return false; + } + + @Override + public boolean isShutdown() { + return false; + } + + @Override + public boolean isTerminated() { + return false; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) { + return false; + } + + @Override + public void execute(Runnable task) { + if (task == null) { + throw new NullPointerException("task"); + } + + boolean inEventLoop = inEventLoop(); + if (inEventLoop) { + addTask(task); + } else { + startThread(); + addTask(task); + } + } + + // ScheduledExecutorService implementation + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + if (command == null) { + throw new NullPointerException("command"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + if (delay < 0) { + throw new IllegalArgumentException( + String.format("delay: %d (expected: >= 0)", delay)); + } + return schedule(new ScheduledFutureTask( + this, delayedTaskQueue, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay)))); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + if (callable == null) { + throw new NullPointerException("callable"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + if (delay < 0) { + throw new IllegalArgumentException( + String.format("delay: %d (expected: >= 0)", delay)); + } + return schedule(new ScheduledFutureTask( + this, delayedTaskQueue, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay)))); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + if (command == null) { + throw new NullPointerException("command"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + if (initialDelay < 0) { + throw new IllegalArgumentException( + String.format("initialDelay: %d (expected: >= 0)", initialDelay)); + } + if (period <= 0) { + throw new IllegalArgumentException( + String.format("period: %d (expected: > 0)", period)); + } + + return schedule(new ScheduledFutureTask( + this, delayedTaskQueue, Executors.callable(command, null), + ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period))); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + if (command == null) { + throw new NullPointerException("command"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + if (initialDelay < 0) { + throw new IllegalArgumentException( + String.format("initialDelay: %d (expected: >= 0)", initialDelay)); + } + if (delay <= 0) { + throw new IllegalArgumentException( + String.format("delay: %d (expected: > 0)", delay)); + } + + return schedule(new ScheduledFutureTask( + this, delayedTaskQueue, Executors.callable(command, null), + ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay))); + } + + private ScheduledFuture schedule(final ScheduledFutureTask task) { + if (task == null) { + throw new NullPointerException("task"); + } + + if (inEventLoop()) { + delayedTaskQueue.add(task); + } else { + execute(new Runnable() { + @Override + public void run() { + delayedTaskQueue.add(task); + } + }); + } + + return task; + } + + private void startThread() { + synchronized (stateLock) { + if (state == ST_NOT_STARTED) { + state = ST_STARTED; + + thread = threadFactory.newThread(taskRunner); + + thread.start(); + } + } + } + + final class TaskRunner implements Runnable { + @Override + public void run() { + for (;;) { + Runnable task = takeTask(); + if (task != null) { + try { + task.run(); + } catch (Throwable t) { + logger.warn("Unexpected exception from the global event executor: ", t); + } + + if (task != purgeTask) { + continue; + } + } + + if (taskQueue.isEmpty() && delayedTaskQueue.size() == 1) { + synchronized (stateLock) { + // Terminate if there is no task in the queue (except the purge task). + if (taskQueue.isEmpty() && delayedTaskQueue.size() == 1) { + state = ST_NOT_STARTED; + break; + } + } + } + } + } + } + + private final class PurgeTask implements Runnable { + @Override + public void run() { + Iterator> i = delayedTaskQueue.iterator(); + while (i.hasNext()) { + ScheduledFutureTask task = i.next(); + if (task.isCancelled()) { + i.remove(); + } + } + } + } +} diff --git a/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java new file mode 100644 index 0000000000..fc7b5b4eb9 --- /dev/null +++ b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java @@ -0,0 +1,148 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.util.concurrent; + +import java.util.Queue; +import java.util.concurrent.Callable; +import java.util.concurrent.Delayed; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +@SuppressWarnings("ComparableImplementedButEqualsNotOverridden") +final class ScheduledFutureTask extends PromiseTask implements ScheduledFuture { + private static final AtomicLong nextTaskId = new AtomicLong(); + private static final long START_TIME = System.nanoTime(); + + static long nanoTime() { + return System.nanoTime() - START_TIME; + } + + static long deadlineNanos(long delay) { + return nanoTime() + delay; + } + + private final long id = nextTaskId.getAndIncrement(); + private final Queue> delayedTaskQueue; + private long deadlineNanos; + /* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */ + private final long periodNanos; + + ScheduledFutureTask( + EventExecutor executor, Queue> delayedTaskQueue, + Runnable runnable, V result, long nanoTime) { + + this(executor, delayedTaskQueue, Executors.callable(runnable, result), nanoTime); + } + + ScheduledFutureTask( + EventExecutor executor, Queue> delayedTaskQueue, + Callable callable, long nanoTime, long period) { + + super(executor, callable); + if (period == 0) { + throw new IllegalArgumentException("period: 0 (expected: != 0)"); + } + this.delayedTaskQueue = delayedTaskQueue; + deadlineNanos = nanoTime; + periodNanos = period; + } + + ScheduledFutureTask( + EventExecutor executor, Queue> delayedTaskQueue, + Callable callable, long nanoTime) { + + super(executor, callable); + this.delayedTaskQueue = delayedTaskQueue; + deadlineNanos = nanoTime; + periodNanos = 0; + } + + @Override + protected EventExecutor executor() { + return super.executor(); + } + + public long deadlineNanos() { + return deadlineNanos; + } + + public long delayNanos() { + return Math.max(0, deadlineNanos() - nanoTime()); + } + + public long delayNanos(long currentTimeNanos) { + return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME)); + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(delayNanos(), TimeUnit.NANOSECONDS); + } + + @Override + public int compareTo(Delayed o) { + if (this == o) { + return 0; + } + + ScheduledFutureTask that = (ScheduledFutureTask) o; + long d = deadlineNanos() - that.deadlineNanos(); + if (d < 0) { + return -1; + } else if (d > 0) { + return 1; + } else if (id < that.id) { + return -1; + } else if (id == that.id) { + throw new Error(); + } else { + return 1; + } + } + + @Override + public void run() { + assert executor().inEventLoop(); + try { + if (periodNanos == 0) { + if (setUncancellableInternal()) { + V result = task.call(); + setSuccessInternal(result); + } + } else { + // check if is done as it may was cancelled + if (!isCancelled()) { + task.call(); + if (!executor().isShutdown()) { + long p = periodNanos; + if (p > 0) { + deadlineNanos += p; + } else { + deadlineNanos = nanoTime() - p; + } + if (!isCancelled()) { + delayedTaskQueue.add(this); + } + } + } + } + } catch (Throwable cause) { + setFailureInternal(cause); + } + } +} diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index 89c9510779..e8f1b504f8 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -27,14 +27,12 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; -import java.util.concurrent.Delayed; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; /** * Abstract base class for {@link EventExecutor}'s that execute all its submitted tasks in a single thread. @@ -243,7 +241,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } if (nanoTime == 0L) { - nanoTime = nanoTime(); + nanoTime = ScheduledFutureTask.nanoTime(); } if (delayedTask.deadlineNanos() <= nanoTime) { @@ -326,7 +324,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { task = pollTask(); if (task == null) { - lastExecutionTime = nanoTime(); + lastExecutionTime = ScheduledFutureTask.nanoTime(); return true; } } @@ -343,7 +341,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { return false; } - final long deadline = nanoTime() + timeoutNanos; + final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; for (;;) { @@ -358,7 +356,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { // Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. if ((runTasks & 0x3F) == 0) { - lastExecutionTime = nanoTime(); + lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } @@ -366,7 +364,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { task = pollTask(); if (task == null) { - lastExecutionTime = nanoTime(); + lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } @@ -395,7 +393,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { * checks. */ protected void updateLastExecutionTime() { - lastExecutionTime = nanoTime(); + lastExecutionTime = ScheduledFutureTask.nanoTime(); } /** @@ -416,11 +414,6 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } } - @Override - public boolean inEventLoop() { - return inEventLoop(Thread.currentThread()); - } - @Override public boolean inEventLoop(Thread thread) { return thread == this.thread; @@ -476,7 +469,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } if (ran) { - lastExecutionTime = nanoTime(); + lastExecutionTime = ScheduledFutureTask.nanoTime(); } return ran; @@ -602,7 +595,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { cancelDelayedTasks(); if (gracefulShutdownStartTime == 0) { - gracefulShutdownStartTime = nanoTime(); + gracefulShutdownStartTime = ScheduledFutureTask.nanoTime(); } if (runAllTasks() || runShutdownHooks()) { @@ -616,7 +609,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { return false; } - final long nanoTime = nanoTime(); + final long nanoTime = ScheduledFutureTask.nanoTime(); if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) { return true; @@ -701,15 +694,6 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { // ScheduledExecutorService implementation private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1); - private static final long START_TIME = System.nanoTime(); - private static final AtomicLong nextTaskId = new AtomicLong(); - private static long nanoTime() { - return System.nanoTime() - START_TIME; - } - - private static long deadlineNanos(long delay) { - return nanoTime() + delay; - } @Override public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { @@ -723,7 +707,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { throw new IllegalArgumentException( String.format("delay: %d (expected: >= 0)", delay)); } - return schedule(new ScheduledFutureTask(this, command, null, deadlineNanos(unit.toNanos(delay)))); + return schedule(new ScheduledFutureTask( + this, delayedTaskQueue, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay)))); } @Override @@ -738,7 +723,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { throw new IllegalArgumentException( String.format("delay: %d (expected: >= 0)", delay)); } - return schedule(new ScheduledFutureTask(this, callable, deadlineNanos(unit.toNanos(delay)))); + return schedule(new ScheduledFutureTask( + this, delayedTaskQueue, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay)))); } @Override @@ -759,8 +745,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } return schedule(new ScheduledFutureTask( - this, Executors.callable(command, null), - deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period))); + this, delayedTaskQueue, Executors.callable(command, null), + ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period))); } @Override @@ -781,8 +767,8 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } return schedule(new ScheduledFutureTask( - this, Executors.callable(command, null), - deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay))); + this, delayedTaskQueue, Executors.callable(command, null), + ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay))); } private ScheduledFuture schedule(final ScheduledFutureTask task) { @@ -809,115 +795,13 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { if (state == ST_NOT_STARTED) { state = ST_STARTED; delayedTaskQueue.add(new ScheduledFutureTask( - this, Executors.callable(new PurgeTask(), null), - deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL)); + this, delayedTaskQueue, Executors.callable(new PurgeTask(), null), + ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL)); thread.start(); } } } - @SuppressWarnings("ComparableImplementedButEqualsNotOverridden") - private static final class ScheduledFutureTask extends PromiseTask implements ScheduledFuture { - - private final long id = nextTaskId.getAndIncrement(); - private long deadlineNanos; - /* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */ - private final long periodNanos; - - ScheduledFutureTask(SingleThreadEventExecutor executor, Runnable runnable, V result, long nanoTime) { - this(executor, Executors.callable(runnable, result), nanoTime); - } - - ScheduledFutureTask(SingleThreadEventExecutor executor, Callable callable, long nanoTime, long period) { - super(executor, callable); - if (period == 0) { - throw new IllegalArgumentException("period: 0 (expected: != 0)"); - } - deadlineNanos = nanoTime; - periodNanos = period; - } - - ScheduledFutureTask(SingleThreadEventExecutor executor, Callable callable, long nanoTime) { - super(executor, callable); - deadlineNanos = nanoTime; - periodNanos = 0; - } - - @Override - protected SingleThreadEventExecutor executor() { - return (SingleThreadEventExecutor) super.executor(); - } - - public long deadlineNanos() { - return deadlineNanos; - } - - public long delayNanos() { - return Math.max(0, deadlineNanos() - nanoTime()); - } - - public long delayNanos(long currentTimeNanos) { - return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME)); - } - - @Override - public long getDelay(TimeUnit unit) { - return unit.convert(delayNanos(), TimeUnit.NANOSECONDS); - } - - @Override - public int compareTo(Delayed o) { - if (this == o) { - return 0; - } - - ScheduledFutureTask that = (ScheduledFutureTask) o; - long d = deadlineNanos() - that.deadlineNanos(); - if (d < 0) { - return -1; - } else if (d > 0) { - return 1; - } else if (id < that.id) { - return -1; - } else if (id == that.id) { - throw new Error(); - } else { - return 1; - } - } - - @Override - public void run() { - assert executor().inEventLoop(); - try { - if (periodNanos == 0) { - if (setUncancellableInternal()) { - V result = task.call(); - setSuccessInternal(result); - } - } else { - // check if is done as it may was cancelled - if (!isCancelled()) { - task.call(); - if (!executor().isShutdown()) { - long p = periodNanos; - if (p > 0) { - deadlineNanos += p; - } else { - deadlineNanos = nanoTime() - p; - } - if (!isCancelled()) { - executor().delayedTaskQueue.add(this); - } - } - } - } - } catch (Throwable cause) { - setFailureInternal(cause); - } - } - } - private final class PurgeTask implements Runnable { @Override public void run() { diff --git a/common/src/test/java/io/netty/util/concurrent/GlobalEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/GlobalEventExecutorTest.java new file mode 100644 index 0000000000..cbc65df5d5 --- /dev/null +++ b/common/src/test/java/io/netty/util/concurrent/GlobalEventExecutorTest.java @@ -0,0 +1,112 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.util.concurrent; + +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.*; + +public class GlobalEventExecutorTest { + + private static final GlobalEventExecutor e = GlobalEventExecutor.INSTANCE; + + @Before + public void setUp() throws Exception { + // Wait until the global executor is stopped (just in case there is a task running due to previous test cases) + for (;;) { + if (e.thread == null || !e.thread.isAlive()) { + break; + } + + Thread.sleep(50); + } + } + + @Test + public void testAutomaticStartStop() throws Exception { + final TestRunnable task = new TestRunnable(500); + e.execute(task); + + // Ensure the new thread has started. + Thread thread = e.thread; + assertThat(thread, is(not(nullValue()))); + assertThat(thread.isAlive(), is(true)); + + Thread.sleep(1500); + + // Ensure the thread stopped itself after running the task. + assertThat(thread.isAlive(), is(false)); + assertThat(task.ran.get(), is(true)); + assertThat(e.thread, sameInstance(thread)); + + // Ensure another new thread starts again. + task.ran.set(false); + e.execute(task); + assertThat(e.thread, not(sameInstance(thread))); + thread = e.thread; + + Thread.sleep(1500); + + // Ensure the thread stopped itself after running the task. + assertThat(thread.isAlive(), is(false)); + assertThat(task.ran.get(), is(true)); + assertThat(e.thread, sameInstance(thread)); + } + + @Test + public void testScheduledTasks() throws Exception { + TestRunnable task = new TestRunnable(0); + ScheduledFuture f = e.schedule(task, 1500, TimeUnit.MILLISECONDS); + f.sync(); + assertThat(task.ran.get(), is(true)); + + // Ensure the thread is still running. + Thread thread = e.thread; + assertThat(thread, is(not(nullValue()))); + assertThat(thread.isAlive(), is(true)); + + Thread.sleep(1500); + + // Not it should be stopped. + assertThat(thread.isAlive(), is(false)); + assertThat(e.thread, sameInstance(thread)); + } + + private static final class TestRunnable implements Runnable { + final AtomicBoolean ran = new AtomicBoolean(); + final long delay; + + TestRunnable(long delay) { + this.delay = delay; + } + + @Override + public void run() { + try { + Thread.sleep(delay); + ran.set(true); + } catch (InterruptedException ignored) { + // Ignore + } + } + } +}