diff --git a/common/src/main/java/io/netty/util/ThreadDeathWatcher.java b/common/src/main/java/io/netty/util/ThreadDeathWatcher.java index 02cc6fefea..bea2fc7da3 100644 --- a/common/src/main/java/io/netty/util/ThreadDeathWatcher.java +++ b/common/src/main/java/io/netty/util/ThreadDeathWatcher.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Queue; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -45,6 +46,7 @@ public final class ThreadDeathWatcher { private static final Queue pendingEntries = PlatformDependent.newMpscQueue(); private static final Watcher watcher = new Watcher(); private static final AtomicBoolean started = new AtomicBoolean(); + private static volatile Thread watcherThread; /** * Schedules the specified {@code task} to run when the specified {@code thread} dies. @@ -70,9 +72,31 @@ public final class ThreadDeathWatcher { if (started.compareAndSet(false, true)) { Thread watcherThread = threadFactory.newThread(watcher); watcherThread.start(); + ThreadDeathWatcher.watcherThread = watcherThread; } } + /** + * Waits until the thread of this watcher has no threads to watch and terminates itself. + * Because a new watcher thread will be started again on {@link #watch(Thread, Runnable)}, + * this operation is only useful when you want to ensure that the watcher thread is terminated + * after your application is shut down and there's no chance of calling + * {@link #watch(Thread, Runnable)} afterwards. + * + * @return {@code true} if and only if the watcher thread has been terminated + */ + public boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException { + if (unit == null) { + throw new NullPointerException("unit"); + } + + Thread watcherThread = ThreadDeathWatcher.watcherThread; + if (watcherThread != null) { + watcherThread.join(unit.toMillis(timeout)); + } + return !watcherThread.isAlive(); + } + private ThreadDeathWatcher() { } private static final class Watcher implements Runnable { diff --git a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java index ca00e52a09..db7a5599fb 100644 --- a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java @@ -28,6 +28,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * Single-thread singleton {@link EventExecutor}. It starts the thread automatically and stops it when there is no @@ -38,14 +39,11 @@ public final class GlobalEventExecutor extends AbstractEventExecutor { private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class); - private static final int ST_NOT_STARTED = 1; - private static final int ST_STARTED = 2; - private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1); public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor(); - final Queue taskQueue = new LinkedBlockingQueue(); + final BlockingQueue taskQueue = new LinkedBlockingQueue(); final Queue> delayedTaskQueue = new PriorityQueue>(); final ScheduledFutureTask purgeTask = new ScheduledFutureTask( this, delayedTaskQueue, Executors.callable(new PurgeTask(), null), @@ -53,10 +51,8 @@ public final class GlobalEventExecutor extends AbstractEventExecutor { private final ThreadFactory threadFactory = new DefaultThreadFactory(getClass()); private final TaskRunner taskRunner = new TaskRunner(); - private final Object stateLock = new Object(); - + private final AtomicBoolean started = new AtomicBoolean(); volatile Thread thread; - private volatile int state = ST_NOT_STARTED; private final Future terminationFuture = new FailedFuture(this, new UnsupportedOperationException()); @@ -75,7 +71,7 @@ public final class GlobalEventExecutor extends AbstractEventExecutor { * @return {@code null} if the executor thread has been interrupted or waken up. */ Runnable takeTask() { - BlockingQueue taskQueue = (BlockingQueue) this.taskQueue; + BlockingQueue taskQueue = this.taskQueue; for (;;) { ScheduledFutureTask delayedTask = delayedTaskQueue.peek(); if (delayedTask == null) { @@ -194,6 +190,26 @@ public final class GlobalEventExecutor extends AbstractEventExecutor { return false; } + /** + * Waits until the worker thread of this executor has no tasks left in its task queue and terminates itself. + * Because a new worker thread will be started again when a new task is submitted, this operation is only useful + * when you want to ensure that the worker thread is terminated after your application is shut + * down and there's no chance of submitting a new task afterwards. + * + * @return {@code true} if and only if the worker thread has been terminated + */ + public boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException { + if (unit == null) { + throw new NullPointerException("unit"); + } + + Thread thread = this.thread; + if (thread != null) { + thread.join(unit.toMillis(timeout)); + } + return !thread.isAlive(); + } + @Override public void execute(Runnable task) { if (task == null) { @@ -304,14 +320,10 @@ public final class GlobalEventExecutor extends AbstractEventExecutor { } private void startThread() { - synchronized (stateLock) { - if (state == ST_NOT_STARTED) { - state = ST_STARTED; - - thread = threadFactory.newThread(taskRunner); - - thread.start(); - } + if (started.compareAndSet(false, true)) { + Thread t = threadFactory.newThread(taskRunner); + t.start(); + thread = t; } } @@ -332,14 +344,33 @@ public final class GlobalEventExecutor extends AbstractEventExecutor { } } + // Terminate if there is no task in the queue (except the purge task). if (taskQueue.isEmpty() && delayedTaskQueue.size() == 1) { - synchronized (stateLock) { - // Terminate if there is no task in the queue (except the purge task). - if (taskQueue.isEmpty() && delayedTaskQueue.size() == 1) { - state = ST_NOT_STARTED; - break; - } + // Mark the current thread as stopped. + // The following CAS must always success and must be uncontended, + // because only one thread should be running at the same time. + boolean stopped = started.compareAndSet(true, false); + assert stopped; + + // Check if there are pending entries added by execute() or schedule*() while we do CAS above. + if (taskQueue.isEmpty() && delayedTaskQueue.size() == 1) { + // A) No new task was added and thus there's nothing to handle + // -> safe to terminate because there's nothing left to do + // B) A new thread started and handled all the new tasks. + // -> safe to terminate the new thread will take care the rest + break; } + + // There are pending tasks added again. + if (!started.compareAndSet(false, true)) { + // startThread() started a new thread and set 'started' to true. + // -> terminate this thread so that the new thread reads from taskQueue exclusively. + break; + } + + // New tasks were added, but this worker was faster to set 'started' to true. + // i.e. a new worker thread was not started by startThread(). + // -> keep this thread alive to handle the newly added entries. } } } diff --git a/common/src/test/java/io/netty/util/concurrent/GlobalEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/GlobalEventExecutorTest.java index cbc65df5d5..ca37073301 100644 --- a/common/src/test/java/io/netty/util/concurrent/GlobalEventExecutorTest.java +++ b/common/src/test/java/io/netty/util/concurrent/GlobalEventExecutorTest.java @@ -86,7 +86,7 @@ public class GlobalEventExecutorTest { Thread.sleep(1500); - // Not it should be stopped. + // Now it should be stopped. assertThat(thread.isAlive(), is(false)); assertThat(e.thread, sameInstance(thread)); }