Ensure scheduled tasks are executed before shutdown (#9858)
Motivation: In #9603 the executor hung on shutdown because of an abandoned task on another executor the first was waiting for. Modifications: This commit modifies the executor shutdown sequence to include switching to SHUTDOWN state and then running all remaining tasks. This ensures that no more tasks are scheduled after SHUTDOWN and the last pass of running remaining tasks will take it all. Any tasks scheduled after SHUTDOWN will be rejected. This change preserves the functionality of graceful shutdown with quiet period and only adds one more pass of task execution after the default shutdown process has finished and the executor is ready for termination. Result: After this change tasks that succeed to be added to the executor will be always executed. Tasks which come late will be rejected instead of abandoned.
This commit is contained in:
parent
74c0e8cdc8
commit
d7bb05b1ac
@ -634,6 +634,10 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im
|
||||
* This method must be called from the {@link EventExecutor} thread.
|
||||
*/
|
||||
protected final boolean confirmShutdown() {
|
||||
return confirmShutdown0();
|
||||
}
|
||||
|
||||
boolean confirmShutdown0() {
|
||||
assert inEventLoop();
|
||||
|
||||
if (!isShuttingDown()) {
|
||||
@ -871,12 +875,28 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im
|
||||
}
|
||||
|
||||
try {
|
||||
// Run all remaining tasks and shutdown hooks.
|
||||
// Run all remaining tasks and shutdown hooks. At this point the event loop
|
||||
// is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
|
||||
// graceful shutdown with quietPeriod.
|
||||
for (;;) {
|
||||
if (confirmShutdown()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Now we want to make sure no more tasks can be added from this point. This is
|
||||
// achieved by switching the state. Any new tasks beyond this point will be rejected.
|
||||
for (;;) {
|
||||
int oldState = state;
|
||||
if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
|
||||
SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// We have the final set of tasks in the queue now, no more can be added, run all remaining.
|
||||
// No need to loop here, this is the final pass.
|
||||
confirmShutdown();
|
||||
} finally {
|
||||
try {
|
||||
cleanup();
|
||||
@ -889,9 +909,10 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im
|
||||
|
||||
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
|
||||
threadLock.countDown();
|
||||
if (logger.isWarnEnabled() && !taskQueue.isEmpty()) {
|
||||
int numUserTasks = drainTasks();
|
||||
if (numUserTasks > 0 && logger.isWarnEnabled()) {
|
||||
logger.warn("An event executor terminated with " +
|
||||
"non-empty task queue (" + taskQueue.size() + ')');
|
||||
"non-empty task queue (" + numUserTasks + ')');
|
||||
}
|
||||
terminationFuture.setSuccess(null);
|
||||
}
|
||||
@ -900,6 +921,22 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im
|
||||
});
|
||||
}
|
||||
|
||||
final int drainTasks() {
|
||||
int numTasks = 0;
|
||||
for (;;) {
|
||||
Runnable runnable = taskQueue.poll();
|
||||
if (runnable == null) {
|
||||
break;
|
||||
}
|
||||
// WAKEUP_TASK should be just discarded as these are added internally.
|
||||
// The important bit is that we not have any user tasks left.
|
||||
if (WAKEUP_TASK != runnable) {
|
||||
numTasks++;
|
||||
}
|
||||
}
|
||||
return numTasks;
|
||||
}
|
||||
|
||||
private static final class DefaultThreadProperties implements ThreadProperties {
|
||||
private final Thread t;
|
||||
|
||||
|
@ -20,14 +20,17 @@ import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
public class SingleThreadEventExecutorTest {
|
||||
@ -178,4 +181,79 @@ public class SingleThreadEventExecutorTest {
|
||||
executor.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTaskAddedAfterShutdownNotAbandoned() throws Exception {
|
||||
|
||||
// A queue that doesn't support remove, so tasks once added cannot be rejected anymore
|
||||
LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>() {
|
||||
@Override
|
||||
public boolean remove(Object o) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
|
||||
final Runnable dummyTask = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
}
|
||||
};
|
||||
|
||||
final LinkedBlockingQueue<Future<?>> submittedTasks = new LinkedBlockingQueue<Future<?>>();
|
||||
final AtomicInteger attempts = new AtomicInteger();
|
||||
final AtomicInteger rejects = new AtomicInteger();
|
||||
|
||||
ExecutorService executorService = Executors.newSingleThreadExecutor();
|
||||
final SingleThreadEventExecutor executor = new SingleThreadEventExecutor(executorService, Integer.MAX_VALUE,
|
||||
RejectedExecutionHandlers.reject()) {
|
||||
|
||||
@Override
|
||||
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
|
||||
return taskQueue;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void run() {
|
||||
while (!confirmShutdown()) {
|
||||
Runnable task = takeTask();
|
||||
if (task != null) {
|
||||
task.run();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean confirmShutdown0() {
|
||||
boolean result = super.confirmShutdown0();
|
||||
// After shutdown is confirmed, scheduled one more task and record it
|
||||
if (result) {
|
||||
attempts.incrementAndGet();
|
||||
try {
|
||||
submittedTasks.add(submit(dummyTask));
|
||||
} catch (RejectedExecutionException e) {
|
||||
// ignore, tasks are either accepted or rejected
|
||||
rejects.incrementAndGet();
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
};
|
||||
|
||||
// Start the loop
|
||||
executor.submit(dummyTask).sync();
|
||||
|
||||
// Shutdown without any quiet period
|
||||
executor.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS).sync();
|
||||
|
||||
// Ensure there are no user-tasks left.
|
||||
Assert.assertEquals(0, executor.drainTasks());
|
||||
|
||||
// Verify that queue is empty and all attempts either succeeded or were rejected
|
||||
Assert.assertTrue(taskQueue.isEmpty());
|
||||
Assert.assertTrue(attempts.get() > 0);
|
||||
Assert.assertEquals(attempts.get(), submittedTasks.size() + rejects.get());
|
||||
for (Future<?> f : submittedTasks) {
|
||||
Assert.assertTrue(f.isSuccess());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user