diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index 510d05aea4..d1de03fe6e 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -16,6 +16,7 @@ package io.netty.util.concurrent; import io.netty.util.internal.ObjectUtil; +import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; @@ -582,7 +583,18 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx gracefulShutdownTimeout = unit.toNanos(timeout); if (oldState == ST_NOT_STARTED) { - doStartThread(); + try { + doStartThread(); + } catch (Throwable cause) { + STATE_UPDATER.set(this, ST_TERMINATED); + terminationFuture.tryFailure(cause); + + if (!(cause instanceof Exception)) { + // Also rethrow as it may be an OOME for example + PlatformDependent.throwException(cause); + } + return terminationFuture; + } } if (wakeup) { @@ -634,7 +646,18 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx } if (oldState == ST_NOT_STARTED) { - doStartThread(); + try { + doStartThread(); + } catch (Throwable cause) { + STATE_UPDATER.set(this, ST_TERMINATED); + terminationFuture.tryFailure(cause); + + if (!(cause instanceof Exception)) { + // Also rethrow as it may be an OOME for example + PlatformDependent.throwException(cause); + } + return; + } } if (wakeup) { @@ -837,7 +860,12 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx private void startThread() { if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { - doStartThread(); + try { + doStartThread(); + } catch (Throwable cause) { + STATE_UPDATER.set(this, ST_NOT_STARTED); + PlatformDependent.throwException(cause); + } } } } diff --git a/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java index 3be7dc8f83..55981b2406 100644 --- a/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java +++ b/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java @@ -21,6 +21,8 @@ import org.junit.Test; import java.util.Collections; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; @@ -28,6 +30,48 @@ import java.util.concurrent.atomic.AtomicReference; public class SingleThreadEventExecutorTest { + @Test + public void testWrappedExecutureIsShutdown() { + ExecutorService executorService = Executors.newSingleThreadExecutor(); + + SingleThreadEventExecutor executor = new SingleThreadEventExecutor(null, executorService, false) { + @Override + protected void run() { + while (!confirmShutdown()) { + Runnable task = takeTask(); + if (task != null) { + task.run(); + } + } + } + }; + + executorService.shutdownNow(); + executeShouldFail(executor); + executeShouldFail(executor); + try { + executor.shutdownGracefully().syncUninterruptibly(); + Assert.fail(); + } catch (RejectedExecutionException expected) { + // expected + } + Assert.assertTrue(executor.isShutdown()); + } + + private static void executeShouldFail(Executor executor) { + try { + executor.execute(new Runnable() { + @Override + public void run() { + // Noop. + } + }); + Assert.fail(); + } catch (RejectedExecutionException expected) { + // expected + } + } + @Test public void testThreadProperties() { final AtomicReference threadRef = new AtomicReference();