From 5ad35a157cac27004b492703708ecebc4442a55a Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Mon, 11 Dec 2017 15:50:43 +0100 Subject: [PATCH] SingleThreadEventExecutor ignores startThread failures Motivation: When doStartThread throws an exception, e.g. due to the actual executor being depleted of threads and throwing in its rejected execution handler, the STEE ends up in started state anyway. If we try to execute another task in this executor, it will be queued but the thread won't be started anymore and the task will linger forever. Modifications: - Ensure we not update the internal state if the startThread() method throws. - Add testcase Result: Fixes [#7483] --- .../concurrent/SingleThreadEventExecutor.java | 34 ++++++++++++-- .../SingleThreadEventExecutorTest.java | 44 +++++++++++++++++++ 2 files changed, 75 insertions(+), 3 deletions(-) diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index 510d05aea4..d1de03fe6e 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -16,6 +16,7 @@ package io.netty.util.concurrent; import io.netty.util.internal.ObjectUtil; +import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; @@ -582,7 +583,18 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx gracefulShutdownTimeout = unit.toNanos(timeout); if (oldState == ST_NOT_STARTED) { - doStartThread(); + try { + doStartThread(); + } catch (Throwable cause) { + STATE_UPDATER.set(this, ST_TERMINATED); + terminationFuture.tryFailure(cause); + + if (!(cause instanceof Exception)) { + // Also rethrow as it may be an OOME for example + PlatformDependent.throwException(cause); + } + return terminationFuture; + } } if (wakeup) { @@ -634,7 +646,18 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx } if (oldState == ST_NOT_STARTED) { - doStartThread(); + try { + doStartThread(); + } catch (Throwable cause) { + STATE_UPDATER.set(this, ST_TERMINATED); + terminationFuture.tryFailure(cause); + + if (!(cause instanceof Exception)) { + // Also rethrow as it may be an OOME for example + PlatformDependent.throwException(cause); + } + return; + } } if (wakeup) { @@ -837,7 +860,12 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx private void startThread() { if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { - doStartThread(); + try { + doStartThread(); + } catch (Throwable cause) { + STATE_UPDATER.set(this, ST_NOT_STARTED); + PlatformDependent.throwException(cause); + } } } } diff --git a/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java index 3be7dc8f83..55981b2406 100644 --- a/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java +++ b/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java @@ -21,6 +21,8 @@ import org.junit.Test; import java.util.Collections; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; @@ -28,6 +30,48 @@ import java.util.concurrent.atomic.AtomicReference; public class SingleThreadEventExecutorTest { + @Test + public void testWrappedExecutureIsShutdown() { + ExecutorService executorService = Executors.newSingleThreadExecutor(); + + SingleThreadEventExecutor executor = new SingleThreadEventExecutor(null, executorService, false) { + @Override + protected void run() { + while (!confirmShutdown()) { + Runnable task = takeTask(); + if (task != null) { + task.run(); + } + } + } + }; + + executorService.shutdownNow(); + executeShouldFail(executor); + executeShouldFail(executor); + try { + executor.shutdownGracefully().syncUninterruptibly(); + Assert.fail(); + } catch (RejectedExecutionException expected) { + // expected + } + Assert.assertTrue(executor.isShutdown()); + } + + private static void executeShouldFail(Executor executor) { + try { + executor.execute(new Runnable() { + @Override + public void run() { + // Noop. + } + }); + Assert.fail(); + } catch (RejectedExecutionException expected) { + // expected + } + } + @Test public void testThreadProperties() { final AtomicReference threadRef = new AtomicReference();