[#5507] SingleThreadEventExecutor should reject call invoke*() from within the EventLoop.

Motivation:

ExecutorService.invoke*(...) methods may block by API definition. This can lead to deadlocks if called from inside the EventLoop in SingleThreadEventExecutor as it only has one Thread that does all the work.

Modifications:

Throw a RejectedExectionException if someone tries to call SingleThreadEventExecutor.invoke*(...) while in the EventLoop.

Result:

No more deadlock possible.
This commit is contained in:
Norman Maurer 2016-07-07 07:50:51 +02:00
parent 50a74e95f2
commit ce95c50444
2 changed files with 114 additions and 0 deletions

View File

@ -23,17 +23,21 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
import java.lang.Thread.State; import java.lang.Thread.State;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@ -739,6 +743,39 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
} }
} }
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
throwIfInEventLoop("invokeAny");
return super.invokeAny(tasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
throwIfInEventLoop("invokeAny");
return super.invokeAny(tasks, timeout, unit);
}
@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
throwIfInEventLoop("invokeAll");
return super.invokeAll(tasks);
}
@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
throwIfInEventLoop("invokeAll");
return super.invokeAll(tasks, timeout, unit);
}
private void throwIfInEventLoop(String method) {
if (inEventLoop()) {
throw new RejectedExecutionException("Calling " + method + " from within the EventLoop is not allowed");
}
}
/** /**
* Returns the {@link ThreadProperties} of the {@link Thread} that powers the {@link SingleThreadEventExecutor}. * Returns the {@link ThreadProperties} of the {@link Thread} that powers the {@link SingleThreadEventExecutor}.
* If the {@link SingleThreadEventExecutor} is not started yet, this operation will start it and block until the * If the {@link SingleThreadEventExecutor} is not started yet, this operation will start it and block until the

View File

@ -18,6 +18,12 @@ package io.netty.util.concurrent;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
public class SingleThreadEventExecutorTest { public class SingleThreadEventExecutorTest {
@ -49,4 +55,75 @@ public class SingleThreadEventExecutorTest {
Assert.assertTrue(threadProperties.stackTrace().length > 0); Assert.assertTrue(threadProperties.stackTrace().length > 0);
executor.shutdownGracefully(); executor.shutdownGracefully();
} }
@Test(expected = RejectedExecutionException.class, timeout = 3000)
public void testInvokeAnyInEventLoop() {
testInvokeInEventLoop(true, false);
}
@Test(expected = RejectedExecutionException.class, timeout = 3000)
public void testInvokeAnyInEventLoopWithTimeout() {
testInvokeInEventLoop(true, true);
}
@Test(expected = RejectedExecutionException.class, timeout = 3000)
public void testInvokeAllInEventLoop() {
testInvokeInEventLoop(false, false);
}
@Test(expected = RejectedExecutionException.class, timeout = 3000)
public void testInvokeAllInEventLoopWithTimeout() {
testInvokeInEventLoop(false, true);
}
private static void testInvokeInEventLoop(final boolean any, final boolean timeout) {
final SingleThreadEventExecutor executor = new SingleThreadEventExecutor(null,
Executors.defaultThreadFactory(), false) {
@Override
protected void run() {
while (!confirmShutdown()) {
Runnable task = takeTask();
if (task != null) {
task.run();
}
}
}
};
try {
final Promise<Void> promise = executor.newPromise();
executor.execute(new Runnable() {
@Override
public void run() {
try {
Set<Callable<Boolean>> set = Collections.<Callable<Boolean>>singleton(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
promise.setFailure(new AssertionError("Should never execute the Callable"));
return Boolean.TRUE;
}
});
if (any) {
if (timeout) {
executor.invokeAny(set, 10, TimeUnit.SECONDS);
} else {
executor.invokeAny(set);
}
} else {
if (timeout) {
executor.invokeAll(set, 10, TimeUnit.SECONDS);
} else {
executor.invokeAll(set);
}
}
promise.setFailure(new AssertionError("Should never reach here"));
} catch (Throwable cause) {
promise.setFailure(cause);
}
}
});
promise.syncUninterruptibly();
} finally {
executor.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
}
}
} }