125 lines
3.5 KiB
Java
125 lines
3.5 KiB
Java
package org.warp.commonutils.concurrency.executor;
|
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
import java.util.concurrent.ExecutionException;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
import org.junit.jupiter.api.Assertions;
|
|
import org.junit.jupiter.api.Test;
|
|
import org.opentest4j.AssertionFailedError;
|
|
import org.warp.commonutils.type.ShortNamedThreadFactory;
|
|
|
|
public class BoundedQueueTest {
|
|
|
|
@Test
|
|
public void testBoundedQueue() throws InterruptedException, ExecutionException {
|
|
testBoundedQueue(1, 1);
|
|
testBoundedQueue(1, 10);
|
|
testBoundedQueue(4, 10);
|
|
testBoundedQueue(0, 10);
|
|
}
|
|
|
|
public void testBoundedQueue(int corePoolSize, int maxPoolSize) throws InterruptedException, ExecutionException {
|
|
int maxQueueSize = 2;
|
|
AtomicInteger queueSize = new AtomicInteger();
|
|
AtomicReference<AssertionFailedError> failedError = new AtomicReference<>();
|
|
AtomicInteger maxRecordedCurrentQueueSize = new AtomicInteger(0);
|
|
var executor = BoundedExecutorService.create(maxQueueSize,
|
|
maxPoolSize,
|
|
0L,
|
|
TimeUnit.MILLISECONDS,
|
|
new ShortNamedThreadFactory("test"),
|
|
(isQueueFull, currentQueueSize) -> {
|
|
try {
|
|
if (currentQueueSize >= maxQueueSize) {
|
|
Assertions.assertTrue(isQueueFull);
|
|
} else {
|
|
Assertions.assertFalse(isQueueFull);
|
|
}
|
|
} catch (AssertionFailedError ex) {
|
|
if (failedError.get() == null) {
|
|
failedError.set(ex);
|
|
}
|
|
ex.printStackTrace();
|
|
}
|
|
}
|
|
);
|
|
|
|
for (int i = 0; i < 10000; i++) {
|
|
queueSize.incrementAndGet();
|
|
executor.execute(queueSize::decrementAndGet);
|
|
}
|
|
|
|
executor.testShutdown();
|
|
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
|
|
Assertions.fail("Not terminated");
|
|
}
|
|
|
|
Assertions.assertNull(failedError.get());
|
|
}
|
|
|
|
@Test
|
|
public void testBoundedQueueMaxPoolSize1_1() throws InterruptedException, ExecutionException {
|
|
testBoundedQueueMaxPoolSize( 1, 1);
|
|
}
|
|
|
|
@Test
|
|
public void testBoundedQueueMaxPoolSize10_10() throws InterruptedException, ExecutionException {
|
|
testBoundedQueueMaxPoolSize( 10, 10);
|
|
}
|
|
|
|
@Test
|
|
public void testBoundedQueueMaxPoolSize10_1() throws InterruptedException, ExecutionException {
|
|
testBoundedQueueMaxPoolSize( 10, 1);
|
|
}
|
|
|
|
@Test
|
|
public void testBoundedQueueMaxPoolSize1_10() throws InterruptedException, ExecutionException {
|
|
testBoundedQueueMaxPoolSize( 1, 10);
|
|
}
|
|
|
|
@Test
|
|
public void testBoundedQueueMaxPoolSize4_10() throws InterruptedException, ExecutionException {
|
|
testBoundedQueueMaxPoolSize( 4, 10);
|
|
}
|
|
|
|
public void testBoundedQueueMaxPoolSize(int maxPoolSize, int maxQueueSize) throws InterruptedException, ExecutionException {
|
|
CountDownLatch allFilled = new CountDownLatch(maxPoolSize);
|
|
var executor = BoundedExecutorService.create(maxQueueSize,
|
|
maxPoolSize,
|
|
0L,
|
|
TimeUnit.MILLISECONDS,
|
|
new ShortNamedThreadFactory("test"),
|
|
(isQueueFull, currentQueueSize) -> {
|
|
|
|
}
|
|
);
|
|
|
|
AtomicReference<InterruptedException> failedError = new AtomicReference<>();
|
|
for (int i = 0; i < maxPoolSize; i++) {
|
|
executor.execute(() -> {
|
|
allFilled.countDown();
|
|
try {
|
|
allFilled.await();
|
|
} catch (InterruptedException ex) {
|
|
if (failedError.get() == null) {
|
|
failedError.set(ex);
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
if (!allFilled.await(10, TimeUnit.SECONDS)) {
|
|
Assertions.fail("Not reached max pool size");
|
|
}
|
|
|
|
executor.testShutdown();
|
|
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
|
|
Assertions.fail("Not terminated");
|
|
}
|
|
|
|
Assertions.assertNull(failedError.get());
|
|
}
|
|
}
|