Allow blocking calls inside SingleThreadEventExecutor.addTask (#10811)

Motivation:

GlobalEventExecutor.addTask was rightfully allowed to block by commit
09d38c8. However the same should have been done for
SingleThreadEventExecutor.addTask.

BlockHound is currently intercepting that call, and as a consequence,
it prevents SingleThreadEventExecutor from working properly, if addTask is
called from a thread that cannot block.

The interception is due to LinkedBlockingQueue.offer implementation,
which uses a ReentrantLock internally.

Modifications:

* Added one BlockHound exception to
io.netty.util.internal.Hidden.NettyBlockHoundIntegration for
SingleThreadEventExecutor.addTask.
* Also added unit tests for both SingleThreadEventExecutor.addTask
and GlobalEventExecutor.addTask.

Result:

SingleThreadEventExecutor.addTask can now be invoked from any thread
when BlockHound is activated.
This commit is contained in:
Alexandre Dutra 2020-11-23 19:20:18 +01:00 committed by GitHub
parent ba83a8840f
commit 02cd85181a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 68 additions and 0 deletions

View File

@ -97,6 +97,10 @@ class Hidden {
"io.netty.util.concurrent.SingleThreadEventExecutor",
"takeTask");
builder.allowBlockingCallsInside(
"io.netty.util.concurrent.SingleThreadEventExecutor",
"addTask");
builder.allowBlockingCallsInside(
"io.netty.handler.ssl.ReferenceCountedOpenSslClientContext$ExtendedTrustManagerVerifyCallback",
"verify");

View File

@ -52,6 +52,7 @@ import reactor.blockhound.BlockingOperationError;
import reactor.blockhound.integration.BlockHoundIntegration;
import java.net.InetSocketAddress;
import java.util.Queue;
import java.util.ServiceLoader;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@ -60,7 +61,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.fail;
@ -129,6 +132,37 @@ public class NettyBlockHoundIntegrationTest {
latch.await();
}
@Test(timeout = 5000L)
public void testSingleThreadEventExecutorAddTask() throws Exception {
TestLinkedBlockingQueue<Runnable> taskQueue = new TestLinkedBlockingQueue<>();
SingleThreadEventExecutor executor =
new SingleThreadEventExecutor(null, new DefaultThreadFactory("test"), true) {
@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return taskQueue;
}
@Override
protected void run() {
while (!confirmShutdown()) {
Runnable task = takeTask();
if (task != null) {
task.run();
}
}
}
};
taskQueue.emulateContention();
CountDownLatch latch = new CountDownLatch(1);
executor.submit(() -> {
executor.execute(() -> { }); // calls addTask
latch.countDown();
});
taskQueue.waitUntilContented();
taskQueue.removeContention();
latch.await();
}
@Test(timeout = 5000L)
public void testHashedWheelTimerStartStop() throws Exception {
HashedWheelTimer timer = new HashedWheelTimer();
@ -290,4 +324,34 @@ public class NettyBlockHoundIntegrationTest {
ReferenceCountUtil.release(sslClientCtx);
}
}
private static class TestLinkedBlockingQueue<T> extends LinkedBlockingQueue<T> {
private final ReentrantLock lock = new ReentrantLock();
@Override
public boolean offer(T t) {
lock.lock();
try {
return super.offer(t);
} finally {
lock.unlock();
}
}
void emulateContention() {
lock.lock();
}
void waitUntilContented() throws InterruptedException {
// wait until the lock gets contended
while (lock.getQueueLength() == 0) {
Thread.sleep(10L);
}
}
void removeContention() {
lock.unlock();
}
}
}