diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index 13bbfde258..969c15b017 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -634,6 +634,10 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im * This method must be called from the {@link EventExecutor} thread. */ protected final boolean confirmShutdown() { + return confirmShutdown0(); + } + + boolean confirmShutdown0() { assert inEventLoop(); if (!isShuttingDown()) { @@ -871,12 +875,28 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im } try { - // Run all remaining tasks and shutdown hooks. + // Run all remaining tasks and shutdown hooks. At this point the event loop + // is in ST_SHUTTING_DOWN state still accepting tasks which is needed for + // graceful shutdown with quietPeriod. for (;;) { if (confirmShutdown()) { break; } } + + // Now we want to make sure no more tasks can be added from this point. This is + // achieved by switching the state. Any new tasks beyond this point will be rejected. + for (;;) { + int oldState = state; + if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet( + SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) { + break; + } + } + + // We have the final set of tasks in the queue now, no more can be added, run all remaining. + // No need to loop here, this is the final pass. + confirmShutdown(); } finally { try { cleanup(); @@ -889,9 +909,10 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.countDown(); - if (logger.isWarnEnabled() && !taskQueue.isEmpty()) { + int numUserTasks = drainTasks(); + if (numUserTasks > 0 && logger.isWarnEnabled()) { logger.warn("An event executor terminated with " + - "non-empty task queue (" + taskQueue.size() + ')'); + "non-empty task queue (" + numUserTasks + ')'); } terminationFuture.setSuccess(null); } @@ -900,6 +921,22 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im }); } + final int drainTasks() { + int numTasks = 0; + for (;;) { + Runnable runnable = taskQueue.poll(); + if (runnable == null) { + break; + } + // WAKEUP_TASK should be just discarded as these are added internally. + // The important bit is that we not have any user tasks left. + if (WAKEUP_TASK != runnable) { + numTasks++; + } + } + return numTasks; + } + private static final class DefaultThreadProperties implements ThreadProperties { private final Thread t; diff --git a/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java index cc398e8857..0e2f7759ba 100644 --- a/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java +++ b/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java @@ -20,14 +20,17 @@ import org.junit.Assert; import org.junit.Test; import java.util.Collections; +import java.util.Queue; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; public class SingleThreadEventExecutorTest { @@ -178,4 +181,79 @@ public class SingleThreadEventExecutorTest { executor.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); } } + + @Test + public void testTaskAddedAfterShutdownNotAbandoned() throws Exception { + + // A queue that doesn't support remove, so tasks once added cannot be rejected anymore + LinkedBlockingQueue taskQueue = new LinkedBlockingQueue() { + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + }; + + final Runnable dummyTask = new Runnable() { + @Override + public void run() { + } + }; + + final LinkedBlockingQueue> submittedTasks = new LinkedBlockingQueue>(); + final AtomicInteger attempts = new AtomicInteger(); + final AtomicInteger rejects = new AtomicInteger(); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + final SingleThreadEventExecutor executor = new SingleThreadEventExecutor(executorService, Integer.MAX_VALUE, + RejectedExecutionHandlers.reject()) { + + @Override + protected Queue newTaskQueue(int maxPendingTasks) { + return taskQueue; + } + + @Override + protected void run() { + while (!confirmShutdown()) { + Runnable task = takeTask(); + if (task != null) { + task.run(); + } + } + } + + @Override + protected boolean confirmShutdown0() { + boolean result = super.confirmShutdown0(); + // After shutdown is confirmed, scheduled one more task and record it + if (result) { + attempts.incrementAndGet(); + try { + submittedTasks.add(submit(dummyTask)); + } catch (RejectedExecutionException e) { + // ignore, tasks are either accepted or rejected + rejects.incrementAndGet(); + } + } + return result; + } + }; + + // Start the loop + executor.submit(dummyTask).sync(); + + // Shutdown without any quiet period + executor.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS).sync(); + + // Ensure there are no user-tasks left. + Assert.assertEquals(0, executor.drainTasks()); + + // Verify that queue is empty and all attempts either succeeded or were rejected + Assert.assertTrue(taskQueue.isEmpty()); + Assert.assertTrue(attempts.get() > 0); + Assert.assertEquals(attempts.get(), submittedTasks.size() + rejects.get()); + for (Future f : submittedTasks) { + Assert.assertTrue(f.isSuccess()); + } + } }