SingleThreadEventExecutor ignores startThread failures

Motivation:

When doStartThread throws an exception, e.g. due to the actual executor being depleted of threads and throwing in its rejected execution handler, the STEE ends up in started state anyway. If we try to execute another task in this executor, it will be queued but the thread won't be started anymore and the task will linger forever.

Modifications:

- Ensure we not update the internal state if the startThread() method throws.
- Add testcase

Result:

Fixes [#7483]
This commit is contained in:
Norman Maurer 2017-12-11 15:50:43 +01:00
parent 8b3e98163d
commit 5ad35a157c
2 changed files with 75 additions and 3 deletions

View File

@ -16,6 +16,7 @@
package io.netty.util.concurrent;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
@ -582,7 +583,18 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
gracefulShutdownTimeout = unit.toNanos(timeout);
if (oldState == ST_NOT_STARTED) {
try {
doStartThread();
} catch (Throwable cause) {
STATE_UPDATER.set(this, ST_TERMINATED);
terminationFuture.tryFailure(cause);
if (!(cause instanceof Exception)) {
// Also rethrow as it may be an OOME for example
PlatformDependent.throwException(cause);
}
return terminationFuture;
}
}
if (wakeup) {
@ -634,7 +646,18 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
}
if (oldState == ST_NOT_STARTED) {
try {
doStartThread();
} catch (Throwable cause) {
STATE_UPDATER.set(this, ST_TERMINATED);
terminationFuture.tryFailure(cause);
if (!(cause instanceof Exception)) {
// Also rethrow as it may be an OOME for example
PlatformDependent.throwException(cause);
}
return;
}
}
if (wakeup) {
@ -837,7 +860,12 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
doStartThread();
} catch (Throwable cause) {
STATE_UPDATER.set(this, ST_NOT_STARTED);
PlatformDependent.throwException(cause);
}
}
}
}

View File

@ -21,6 +21,8 @@ import org.junit.Test;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
@ -28,6 +30,48 @@ import java.util.concurrent.atomic.AtomicReference;
public class SingleThreadEventExecutorTest {
@Test
public void testWrappedExecutureIsShutdown() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
SingleThreadEventExecutor executor = new SingleThreadEventExecutor(null, executorService, false) {
@Override
protected void run() {
while (!confirmShutdown()) {
Runnable task = takeTask();
if (task != null) {
task.run();
}
}
}
};
executorService.shutdownNow();
executeShouldFail(executor);
executeShouldFail(executor);
try {
executor.shutdownGracefully().syncUninterruptibly();
Assert.fail();
} catch (RejectedExecutionException expected) {
// expected
}
Assert.assertTrue(executor.isShutdown());
}
private static void executeShouldFail(Executor executor) {
try {
executor.execute(new Runnable() {
@Override
public void run() {
// Noop.
}
});
Assert.fail();
} catch (RejectedExecutionException expected) {
// expected
}
}
@Test
public void testThreadProperties() {
final AtomicReference<Thread> threadRef = new AtomicReference<Thread>();