Allow blocking calls inside SingleThreadEventExecutor.addTask (#10811)

Motivation:

GlobalEventExecutor.addTask was rightfully allowed to block by commit
09d38c8. However the same should have been done for
SingleThreadEventExecutor.addTask.

BlockHound is currently intercepting that call, and as a consequence,
it prevents SingleThreadEventExecutor from working properly, if addTask is
called from a thread that cannot block.

The interception is due to LinkedBlockingQueue.offer implementation,
which uses a ReentrantLock internally.

Modifications:

* Added one BlockHound exception to
io.netty.util.internal.Hidden.NettyBlockHoundIntegration for
SingleThreadEventExecutor.addTask.
* Also added unit tests for both SingleThreadEventExecutor.addTask
and GlobalEventExecutor.addTask.

Result:

SingleThreadEventExecutor.addTask can now be invoked from any thread
when BlockHound is activated.
This commit is contained in:
Alexandre Dutra 2020-11-23 19:20:18 +01:00 committed by Norman Maurer
parent 74bf49b062
commit aab4c0c78a
2 changed files with 68 additions and 0 deletions

View File

@ -77,6 +77,10 @@ class Hidden {
builder.allowBlockingCallsInside("io.netty.util.concurrent.GlobalEventExecutor", builder.allowBlockingCallsInside("io.netty.util.concurrent.GlobalEventExecutor",
"takeTask"); "takeTask");
builder.allowBlockingCallsInside(
"io.netty.util.concurrent.SingleThreadEventExecutor",
"addTask");
builder.allowBlockingCallsInside( builder.allowBlockingCallsInside(
"io.netty.util.concurrent.SingleThreadEventExecutor", "io.netty.util.concurrent.SingleThreadEventExecutor",
"takeTask"); "takeTask");

View File

@ -52,6 +52,7 @@ import reactor.blockhound.BlockingOperationError;
import reactor.blockhound.integration.BlockHoundIntegration; import reactor.blockhound.integration.BlockHoundIntegration;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Queue;
import java.util.ServiceLoader; import java.util.ServiceLoader;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -60,7 +61,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.FutureTask; import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -129,6 +132,37 @@ public class NettyBlockHoundIntegrationTest {
latch.await(); latch.await();
} }
@Test(timeout = 5000L)
public void testSingleThreadEventExecutorAddTask() throws Exception {
TestLinkedBlockingQueue<Runnable> taskQueue = new TestLinkedBlockingQueue<>();
SingleThreadEventExecutor executor =
new SingleThreadEventExecutor(new DefaultThreadFactory("test")) {
@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return taskQueue;
}
@Override
protected void run() {
while (!confirmShutdown()) {
Runnable task = takeTask();
if (task != null) {
task.run();
}
}
}
};
taskQueue.emulateContention();
CountDownLatch latch = new CountDownLatch(1);
executor.submit(() -> {
executor.execute(() -> { }); // calls addTask
latch.countDown();
});
taskQueue.waitUntilContented();
taskQueue.removeContention();
latch.await();
}
@Test(timeout = 5000L) @Test(timeout = 5000L)
public void testHashedWheelTimerStartStop() throws Exception { public void testHashedWheelTimerStartStop() throws Exception {
HashedWheelTimer timer = new HashedWheelTimer(); HashedWheelTimer timer = new HashedWheelTimer();
@ -290,4 +324,34 @@ public class NettyBlockHoundIntegrationTest {
ReferenceCountUtil.release(sslClientCtx); ReferenceCountUtil.release(sslClientCtx);
} }
} }
private static class TestLinkedBlockingQueue<T> extends LinkedBlockingQueue<T> {
private final ReentrantLock lock = new ReentrantLock();
@Override
public boolean offer(T t) {
lock.lock();
try {
return super.offer(t);
} finally {
lock.unlock();
}
}
void emulateContention() {
lock.lock();
}
void waitUntilContented() throws InterruptedException {
// wait until the lock gets contended
while (lock.getQueueLength() == 0) {
Thread.sleep(10L);
}
}
void removeContention() {
lock.unlock();
}
}
} }