Ensure scheduled tasks are executed before shutdown (#9858)

Motivation:

In #9603 the executor hung on shutdown because of an abandoned task
on another executor the first was waiting for.

Modifications:

This commit modifies the executor shutdown sequence to include
switching to SHUTDOWN state and then running all remaining tasks.
This ensures that no more tasks are scheduled after SHUTDOWN and
the last pass of running remaining tasks will take it all.
Any tasks scheduled after SHUTDOWN will be rejected.

This change preserves the functionality of graceful shutdown with
quiet period and only adds one more pass of task execution after
the default shutdown process has finished and the executor is
ready for termination.

Result:

After this change tasks that succeed to be added to the executor will
be always executed. Tasks which come late will be rejected instead of
abandoned.
This commit is contained in:
Robert Mihaly 2019-12-11 10:48:38 +01:00 committed by Norman Maurer
parent af58bfcfb8
commit ed16da2e84
2 changed files with 107 additions and 3 deletions

View File

@ -1009,12 +1009,28 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
} }
try { try {
// Run all remaining tasks and shutdown hooks. // Run all remaining tasks and shutdown hooks. At this point the event loop
// is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
// graceful shutdown with quietPeriod.
for (;;) { for (;;) {
if (confirmShutdown()) { if (confirmShutdown()) {
break; break;
} }
} }
// Now we want to make sure no more tasks can be added from this point. This is
// achieved by switching the state. Any new tasks beyond this point will be rejected.
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
break;
}
}
// We have the final set of tasks in the queue now, no more can be added, run all remaining.
// No need to loop here, this is the final pass.
confirmShutdown();
} finally { } finally {
try { try {
cleanup(); cleanup();
@ -1027,9 +1043,10 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.countDown(); threadLock.countDown();
if (logger.isWarnEnabled() && !taskQueue.isEmpty()) { int numUserTasks = drainTasks();
if (numUserTasks > 0 && logger.isWarnEnabled()) {
logger.warn("An event executor terminated with " + logger.warn("An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')'); "non-empty task queue (" + numUserTasks + ')');
} }
terminationFuture.setSuccess(null); terminationFuture.setSuccess(null);
} }
@ -1039,6 +1056,22 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
}); });
} }
final int drainTasks() {
int numTasks = 0;
for (;;) {
Runnable runnable = taskQueue.poll();
if (runnable == null) {
break;
}
// WAKEUP_TASK should be just discarded as these are added internally.
// The important bit is that we not have any user tasks left.
if (WAKEUP_TASK != runnable) {
numTasks++;
}
}
return numTasks;
}
private static final class DefaultThreadProperties implements ThreadProperties { private static final class DefaultThreadProperties implements ThreadProperties {
private final Thread t; private final Thread t;

View File

@ -31,8 +31,10 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
public class SingleThreadEventExecutorTest { public class SingleThreadEventExecutorTest {
@ -249,4 +251,73 @@ public class SingleThreadEventExecutorTest {
assertEquals(0, latch1.getCount()); assertEquals(0, latch1.getCount());
assertEquals(0, latch2.getCount()); assertEquals(0, latch2.getCount());
} }
@Test
public void testTaskAddedAfterShutdownNotAbandoned() throws Exception {
// A queue that doesn't support remove, so tasks once added cannot be rejected anymore
LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>() {
@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException();
}
};
final Runnable dummyTask = new Runnable() {
@Override
public void run() {
}
};
final LinkedBlockingQueue<Future<?>> submittedTasks = new LinkedBlockingQueue<Future<?>>();
final AtomicInteger attempts = new AtomicInteger();
final AtomicInteger rejects = new AtomicInteger();
ExecutorService executorService = Executors.newSingleThreadExecutor();
final SingleThreadEventExecutor executor = new SingleThreadEventExecutor(null, executorService, false,
taskQueue, RejectedExecutionHandlers.reject()) {
@Override
protected void run() {
while (!confirmShutdown()) {
Runnable task = takeTask();
if (task != null) {
task.run();
}
}
}
@Override
protected boolean confirmShutdown() {
boolean result = super.confirmShutdown();
// After shutdown is confirmed, scheduled one more task and record it
if (result) {
attempts.incrementAndGet();
try {
submittedTasks.add(submit(dummyTask));
} catch (RejectedExecutionException e) {
// ignore, tasks are either accepted or rejected
rejects.incrementAndGet();
}
}
return result;
}
};
// Start the loop
executor.submit(dummyTask).sync();
// Shutdown without any quiet period
executor.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS).sync();
// Ensure there are no user-tasks left.
assertEquals(0, executor.drainTasks());
// Verify that queue is empty and all attempts either succeeded or were rejected
assertTrue(taskQueue.isEmpty());
assertTrue(attempts.get() > 0);
assertEquals(attempts.get(), submittedTasks.size() + rejects.get());
for (Future<?> f : submittedTasks) {
assertTrue(f.isSuccess());
}
}
} }