Simplify SingleThreadEventExecutor.awaitTermination() implementation (#9081)

Motivation

A Semaphore is currently dedicated to this purpose but a simple
CountDownLatch will do.

Modification

Remove private threadLock Semaphore from SingleThreadEventExecutor and just use a CountDownLatch.

Also eliminate use of PlatformDependent.throwException() in startThread
method, and combine some nested if clauses.

Result

Cleaner EventLoop termination notification.
This commit is contained in:
Nick Hill 2019-05-27 07:05:40 -07:00 committed by Norman Maurer
parent 8b04c5ffe7
commit e1a881fa2b

View File

@ -32,11 +32,11 @@ import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -88,7 +88,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
private final Executor executor; private final Executor executor;
private volatile boolean interrupted; private volatile boolean interrupted;
private final Semaphore threadLock = new Semaphore(0); private final CountDownLatch threadLock = new CountDownLatch(1);
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>(); private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
private final boolean addTaskWakesUp; private final boolean addTaskWakesUp;
private final int maxPendingTasks; private final int maxPendingTasks;
@ -740,9 +740,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
throw new IllegalStateException("cannot await termination of the current thread"); throw new IllegalStateException("cannot await termination of the current thread");
} }
if (threadLock.tryAcquire(timeout, unit)) { threadLock.await(timeout, unit);
threadLock.release();
}
return isTerminated(); return isTerminated();
} }
@ -862,11 +860,14 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
private void startThread() { private void startThread() {
if (state == ST_NOT_STARTED) { if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try { try {
doStartThread(); doStartThread();
} catch (Throwable cause) { success = true;
STATE_UPDATER.set(this, ST_NOT_STARTED); } finally {
PlatformDependent.throwException(cause); if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
} }
} }
} }
@ -943,12 +944,10 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
FastThreadLocal.removeAll(); FastThreadLocal.removeAll();
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release(); threadLock.countDown();
if (!taskQueue.isEmpty()) { if (logger.isWarnEnabled() && !taskQueue.isEmpty()) {
if (logger.isWarnEnabled()) { logger.warn("An event executor terminated with " +
logger.warn("An event executor terminated with " + "non-empty task queue (" + taskQueue.size() + ')');
"non-empty task queue (" + taskQueue.size() + ')');
}
} }
terminationFuture.setSuccess(null); terminationFuture.setSuccess(null);
} }