diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index 4b1d75a564..f7ab9aca05 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -1009,12 +1009,28 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx } try { - // Run all remaining tasks and shutdown hooks. + // Run all remaining tasks and shutdown hooks. At this point the event loop + // is in ST_SHUTTING_DOWN state still accepting tasks which is needed for + // graceful shutdown with quietPeriod. for (;;) { if (confirmShutdown()) { break; } } + + // Now we want to make sure no more tasks can be added from this point. This is + // achieved by switching the state. Any new tasks beyond this point will be rejected. + for (;;) { + int oldState = state; + if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet( + SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) { + break; + } + } + + // We have the final set of tasks in the queue now, no more can be added, run all remaining. + // No need to loop here, this is the final pass. + confirmShutdown(); } finally { try { cleanup(); @@ -1027,9 +1043,10 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.countDown(); - if (logger.isWarnEnabled() && !taskQueue.isEmpty()) { + int numUserTasks = drainTasks(); + if (numUserTasks > 0 && logger.isWarnEnabled()) { logger.warn("An event executor terminated with " + - "non-empty task queue (" + taskQueue.size() + ')'); + "non-empty task queue (" + numUserTasks + ')'); } terminationFuture.setSuccess(null); } @@ -1039,6 +1056,22 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx }); } + final int drainTasks() { + int numTasks = 0; + for (;;) { + Runnable runnable = taskQueue.poll(); + if (runnable == null) { + break; + } + // WAKEUP_TASK should be just discarded as these are added internally. + // The important bit is that we not have any user tasks left. + if (WAKEUP_TASK != runnable) { + numTasks++; + } + } + return numTasks; + } + private static final class DefaultThreadProperties implements ThreadProperties { private final Thread t; diff --git a/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java index 861669ab0e..735be2bedd 100644 --- a/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java +++ b/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java @@ -31,8 +31,10 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; public class SingleThreadEventExecutorTest { @@ -249,4 +251,73 @@ public class SingleThreadEventExecutorTest { assertEquals(0, latch1.getCount()); assertEquals(0, latch2.getCount()); } + + @Test + public void testTaskAddedAfterShutdownNotAbandoned() throws Exception { + + // A queue that doesn't support remove, so tasks once added cannot be rejected anymore + LinkedBlockingQueue taskQueue = new LinkedBlockingQueue() { + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + }; + + final Runnable dummyTask = new Runnable() { + @Override + public void run() { + } + }; + + final LinkedBlockingQueue> submittedTasks = new LinkedBlockingQueue>(); + final AtomicInteger attempts = new AtomicInteger(); + final AtomicInteger rejects = new AtomicInteger(); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + final SingleThreadEventExecutor executor = new SingleThreadEventExecutor(null, executorService, false, + taskQueue, RejectedExecutionHandlers.reject()) { + @Override + protected void run() { + while (!confirmShutdown()) { + Runnable task = takeTask(); + if (task != null) { + task.run(); + } + } + } + + @Override + protected boolean confirmShutdown() { + boolean result = super.confirmShutdown(); + // After shutdown is confirmed, scheduled one more task and record it + if (result) { + attempts.incrementAndGet(); + try { + submittedTasks.add(submit(dummyTask)); + } catch (RejectedExecutionException e) { + // ignore, tasks are either accepted or rejected + rejects.incrementAndGet(); + } + } + return result; + } + }; + + // Start the loop + executor.submit(dummyTask).sync(); + + // Shutdown without any quiet period + executor.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS).sync(); + + // Ensure there are no user-tasks left. + assertEquals(0, executor.drainTasks()); + + // Verify that queue is empty and all attempts either succeeded or were rejected + assertTrue(taskQueue.isEmpty()); + assertTrue(attempts.get() > 0); + assertEquals(attempts.get(), submittedTasks.size() + rejects.get()); + for (Future f : submittedTasks) { + assertTrue(f.isSuccess()); + } + } }