Simplify SingleThreadEventExecutor.awaitTermination() implementation (#9081)

Motivation

A Semaphore is currently dedicated to this purpose but a simple
CountDownLatch will do.

Modification

Remove private threadLock Semaphore from SingleThreadEventExecutor and just use a CountDownLatch.

Also eliminate use of PlatformDependent.throwException() in startThread
method, and combine some nested if clauses.

Result

Cleaner EventLoop termination notification.
This commit is contained in:
Nick Hill 2019-05-27 07:05:40 -07:00 committed by Norman Maurer
parent be7c808ce1
commit ed23cfae5e

View File

@ -32,11 +32,11 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -82,7 +82,7 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im
private final Executor executor;
private volatile boolean interrupted;
private final Semaphore threadLock = new Semaphore(0);
private final CountDownLatch threadLock = new CountDownLatch(1);
private final Set<Runnable> shutdownHooks = new LinkedHashSet<>();
private final boolean addTaskWakesUp;
private final RejectedExecutionHandler rejectedExecutionHandler;
@ -688,9 +688,7 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im
throw new IllegalStateException("cannot await termination of the current thread");
}
if (threadLock.tryAcquire(timeout, unit)) {
threadLock.release();
}
threadLock.await(timeout, unit);
return isTerminated();
}
@ -802,11 +800,14 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
} catch (Throwable cause) {
STATE_UPDATER.set(this, ST_NOT_STARTED);
PlatformDependent.throwException(cause);
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
@ -881,13 +882,11 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im
FastThreadLocal.removeAll();
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
if (!taskQueue.isEmpty()) {
if (logger.isWarnEnabled()) {
threadLock.countDown();
if (logger.isWarnEnabled() && !taskQueue.isEmpty()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
}
terminationFuture.setSuccess(null);
}
}