From d7bb05b1ac01e1a6b19559deb51e6db27c9e6f0d Mon Sep 17 00:00:00 2001 From: Robert Mihaly Date: Wed, 11 Dec 2019 10:48:38 +0100 Subject: [PATCH] Ensure scheduled tasks are executed before shutdown (#9858) Motivation: In #9603 the executor hung on shutdown because of an abandoned task on another executor the first was waiting for. Modifications: This commit modifies the executor shutdown sequence to include switching to SHUTDOWN state and then running all remaining tasks. This ensures that no more tasks are scheduled after SHUTDOWN and the last pass of running remaining tasks will take it all. Any tasks scheduled after SHUTDOWN will be rejected. This change preserves the functionality of graceful shutdown with quiet period and only adds one more pass of task execution after the default shutdown process has finished and the executor is ready for termination. Result: After this change tasks that succeed to be added to the executor will be always executed. Tasks which come late will be rejected instead of abandoned. --- .../concurrent/SingleThreadEventExecutor.java | 43 +++++++++- .../SingleThreadEventExecutorTest.java | 78 +++++++++++++++++++ 2 files changed, 118 insertions(+), 3 deletions(-) diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index 13bbfde258..969c15b017 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -634,6 +634,10 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im * This method must be called from the {@link EventExecutor} thread. */ protected final boolean confirmShutdown() { + return confirmShutdown0(); + } + + boolean confirmShutdown0() { assert inEventLoop(); if (!isShuttingDown()) { @@ -871,12 +875,28 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im } try { - // Run all remaining tasks and shutdown hooks. + // Run all remaining tasks and shutdown hooks. At this point the event loop + // is in ST_SHUTTING_DOWN state still accepting tasks which is needed for + // graceful shutdown with quietPeriod. for (;;) { if (confirmShutdown()) { break; } } + + // Now we want to make sure no more tasks can be added from this point. This is + // achieved by switching the state. Any new tasks beyond this point will be rejected. + for (;;) { + int oldState = state; + if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet( + SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) { + break; + } + } + + // We have the final set of tasks in the queue now, no more can be added, run all remaining. + // No need to loop here, this is the final pass. + confirmShutdown(); } finally { try { cleanup(); @@ -889,9 +909,10 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.countDown(); - if (logger.isWarnEnabled() && !taskQueue.isEmpty()) { + int numUserTasks = drainTasks(); + if (numUserTasks > 0 && logger.isWarnEnabled()) { logger.warn("An event executor terminated with " + - "non-empty task queue (" + taskQueue.size() + ')'); + "non-empty task queue (" + numUserTasks + ')'); } terminationFuture.setSuccess(null); } @@ -900,6 +921,22 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im }); } + final int drainTasks() { + int numTasks = 0; + for (;;) { + Runnable runnable = taskQueue.poll(); + if (runnable == null) { + break; + } + // WAKEUP_TASK should be just discarded as these are added internally. + // The important bit is that we not have any user tasks left. + if (WAKEUP_TASK != runnable) { + numTasks++; + } + } + return numTasks; + } + private static final class DefaultThreadProperties implements ThreadProperties { private final Thread t; diff --git a/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java index cc398e8857..0e2f7759ba 100644 --- a/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java +++ b/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java @@ -20,14 +20,17 @@ import org.junit.Assert; import org.junit.Test; import java.util.Collections; +import java.util.Queue; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; public class SingleThreadEventExecutorTest { @@ -178,4 +181,79 @@ public class SingleThreadEventExecutorTest { executor.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); } } + + @Test + public void testTaskAddedAfterShutdownNotAbandoned() throws Exception { + + // A queue that doesn't support remove, so tasks once added cannot be rejected anymore + LinkedBlockingQueue taskQueue = new LinkedBlockingQueue() { + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + }; + + final Runnable dummyTask = new Runnable() { + @Override + public void run() { + } + }; + + final LinkedBlockingQueue> submittedTasks = new LinkedBlockingQueue>(); + final AtomicInteger attempts = new AtomicInteger(); + final AtomicInteger rejects = new AtomicInteger(); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + final SingleThreadEventExecutor executor = new SingleThreadEventExecutor(executorService, Integer.MAX_VALUE, + RejectedExecutionHandlers.reject()) { + + @Override + protected Queue newTaskQueue(int maxPendingTasks) { + return taskQueue; + } + + @Override + protected void run() { + while (!confirmShutdown()) { + Runnable task = takeTask(); + if (task != null) { + task.run(); + } + } + } + + @Override + protected boolean confirmShutdown0() { + boolean result = super.confirmShutdown0(); + // After shutdown is confirmed, scheduled one more task and record it + if (result) { + attempts.incrementAndGet(); + try { + submittedTasks.add(submit(dummyTask)); + } catch (RejectedExecutionException e) { + // ignore, tasks are either accepted or rejected + rejects.incrementAndGet(); + } + } + return result; + } + }; + + // Start the loop + executor.submit(dummyTask).sync(); + + // Shutdown without any quiet period + executor.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS).sync(); + + // Ensure there are no user-tasks left. + Assert.assertEquals(0, executor.drainTasks()); + + // Verify that queue is empty and all attempts either succeeded or were rejected + Assert.assertTrue(taskQueue.isEmpty()); + Assert.assertTrue(attempts.get() > 0); + Assert.assertEquals(attempts.get(), submittedTasks.size() + rejects.get()); + for (Future f : submittedTasks) { + Assert.assertTrue(f.isSuccess()); + } + } }