Add awaitInactivity() to GlobalEventExecutor and ThreadDeathWatcher

Motivation:

When running Netty on a container environment, the container will often
complain about the lingering threads such as the worker threads of
ThreadDeathWatcher and GlobalEventExecutor.  We should provide an
operation that allows a use to wait until such threads are terminated.

Modifications:

- Add awaitInactivity()
- (misc) Fix typo in GlobalEventExecutorTest
- (misc) Port ThreadDeathWatch's CAS-based thread life cycle management
  to GlobalEventExecutor

Result:

- Fixes #2084
- Less overhead on task submission of GlobalEventExecutor
This commit is contained in:
Trustin Lee 2014-06-02 19:23:50 +09:00
parent 9f4543fb39
commit c827c6da37
3 changed files with 79 additions and 23 deletions

View File

@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
/** /**
@ -45,6 +46,7 @@ public final class ThreadDeathWatcher {
private static final Queue<Entry> pendingEntries = PlatformDependent.newMpscQueue(); private static final Queue<Entry> pendingEntries = PlatformDependent.newMpscQueue();
private static final Watcher watcher = new Watcher(); private static final Watcher watcher = new Watcher();
private static final AtomicBoolean started = new AtomicBoolean(); private static final AtomicBoolean started = new AtomicBoolean();
private static volatile Thread watcherThread;
/** /**
* Schedules the specified {@code task} to run when the specified {@code thread} dies. * Schedules the specified {@code task} to run when the specified {@code thread} dies.
@ -70,9 +72,32 @@ public final class ThreadDeathWatcher {
if (started.compareAndSet(false, true)) { if (started.compareAndSet(false, true)) {
Thread watcherThread = threadFactory.newThread(watcher); Thread watcherThread = threadFactory.newThread(watcher);
watcherThread.start(); watcherThread.start();
ThreadDeathWatcher.watcherThread = watcherThread;
} }
} }
/**
* Waits until the thread of this watcher has no threads to watch and terminates itself.
* Because a new watcher thread will be started again on {@link #watch(Thread, Runnable)},
* this operation is only useful when you want to ensure that the watcher thread is terminated
* <strong>after</strong> your application is shut down and there's no chance of calling
* {@link #watch(Thread, Runnable)} afterwards.
*
* @return {@code true} if and only if the watcher thread has been terminated
*/
public boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException {
if (unit == null) {
throw new NullPointerException("unit");
}
Thread watcherThread = ThreadDeathWatcher.watcherThread;
if (watcherThread != null) {
watcherThread.join(unit.toMillis(timeout));
}
return !watcherThread.isAlive();
}
private ThreadDeathWatcher() { } private ThreadDeathWatcher() { }
private static final class Watcher implements Runnable { private static final class Watcher implements Runnable {

View File

@ -28,6 +28,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* Single-thread singleton {@link EventExecutor}. It starts the thread automatically and stops it when there is no * Single-thread singleton {@link EventExecutor}. It starts the thread automatically and stops it when there is no
@ -38,14 +39,11 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class);
private static final int ST_NOT_STARTED = 1;
private static final int ST_STARTED = 2;
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1); private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor(); public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor();
final Queue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>(); final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
final Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>(); final Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
final ScheduledFutureTask<Void> purgeTask = new ScheduledFutureTask<Void>( final ScheduledFutureTask<Void> purgeTask = new ScheduledFutureTask<Void>(
this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null), this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null),
@ -53,10 +51,8 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
private final ThreadFactory threadFactory = new DefaultThreadFactory(getClass()); private final ThreadFactory threadFactory = new DefaultThreadFactory(getClass());
private final TaskRunner taskRunner = new TaskRunner(); private final TaskRunner taskRunner = new TaskRunner();
private final Object stateLock = new Object(); private final AtomicBoolean started = new AtomicBoolean();
volatile Thread thread; volatile Thread thread;
private volatile int state = ST_NOT_STARTED;
private final Future<?> terminationFuture = new FailedFuture<Object>(this, new UnsupportedOperationException()); private final Future<?> terminationFuture = new FailedFuture<Object>(this, new UnsupportedOperationException());
@ -70,7 +66,7 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
* @return {@code null} if the executor thread has been interrupted or waken up. * @return {@code null} if the executor thread has been interrupted or waken up.
*/ */
Runnable takeTask() { Runnable takeTask() {
BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue; BlockingQueue<Runnable> taskQueue = this.taskQueue;
for (;;) { for (;;) {
ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek(); ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
if (delayedTask == null) { if (delayedTask == null) {
@ -190,6 +186,26 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
return false; return false;
} }
/**
* Waits until the worker thread of this executor has no tasks left in its task queue and terminates itself.
* Because a new worker thread will be started again when a new task is submitted, this operation is only useful
* when you want to ensure that the worker thread is terminated <strong>after</strong> your application is shut
* down and there's no chance of submitting a new task afterwards.
*
* @return {@code true} if and only if the worker thread has been terminated
*/
public boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException {
if (unit == null) {
throw new NullPointerException("unit");
}
Thread thread = this.thread;
if (thread != null) {
thread.join(unit.toMillis(timeout));
}
return !thread.isAlive();
}
@Override @Override
public void execute(Runnable task) { public void execute(Runnable task) {
if (task == null) { if (task == null) {
@ -300,14 +316,10 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
} }
private void startThread() { private void startThread() {
synchronized (stateLock) { if (started.compareAndSet(false, true)) {
if (state == ST_NOT_STARTED) { Thread t = threadFactory.newThread(taskRunner);
state = ST_STARTED; t.start();
thread = t;
thread = threadFactory.newThread(taskRunner);
thread.start();
}
} }
} }
@ -328,14 +340,33 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
} }
} }
if (taskQueue.isEmpty() && delayedTaskQueue.size() == 1) {
synchronized (stateLock) {
// Terminate if there is no task in the queue (except the purge task). // Terminate if there is no task in the queue (except the purge task).
if (taskQueue.isEmpty() && delayedTaskQueue.size() == 1) { if (taskQueue.isEmpty() && delayedTaskQueue.size() == 1) {
state = ST_NOT_STARTED; // Mark the current thread as stopped.
// The following CAS must always success and must be uncontended,
// because only one thread should be running at the same time.
boolean stopped = started.compareAndSet(true, false);
assert stopped;
// Check if there are pending entries added by execute() or schedule*() while we do CAS above.
if (taskQueue.isEmpty() && delayedTaskQueue.size() == 1) {
// A) No new task was added and thus there's nothing to handle
// -> safe to terminate because there's nothing left to do
// B) A new thread started and handled all the new tasks.
// -> safe to terminate the new thread will take care the rest
break; break;
} }
// There are pending tasks added again.
if (!started.compareAndSet(false, true)) {
// startThread() started a new thread and set 'started' to true.
// -> terminate this thread so that the new thread reads from taskQueue exclusively.
break;
} }
// New tasks were added, but this worker was faster to set 'started' to true.
// i.e. a new worker thread was not started by startThread().
// -> keep this thread alive to handle the newly added entries.
} }
} }
} }

View File

@ -86,7 +86,7 @@ public class GlobalEventExecutorTest {
Thread.sleep(1500); Thread.sleep(1500);
// Not it should be stopped. // Now it should be stopped.
assertThat(thread.isAlive(), is(false)); assertThat(thread.isAlive(), is(false));
assertThat(e.thread, sameInstance(thread)); assertThat(e.thread, sameInstance(thread));
} }