diff --git a/pom.xml b/pom.xml index 827e204..c752809 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ common-utils org.warp - 1.0.2 + 1.0.3 UTF-8 diff --git a/src/main/java/org/warp/commonutils/batch/ParallelUtils.java b/src/main/java/org/warp/commonutils/batch/ParallelUtils.java index 4d63aee..3076fa3 100644 --- a/src/main/java/org/warp/commonutils/batch/ParallelUtils.java +++ b/src/main/java/org/warp/commonutils/batch/ParallelUtils.java @@ -18,7 +18,7 @@ public class ParallelUtils { int parallelism, int groupSize, BiConsumer consumer) { - BoundedExecutorService parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {}); + BoundedExecutorService parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, parallelism, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {}); final int CHUNK_SIZE = groupSize; IntWrapper count = new IntWrapper(CHUNK_SIZE); VariableWrapper keys = new VariableWrapper<>(new Object[CHUNK_SIZE]); @@ -58,7 +58,7 @@ public class ParallelUtils { int parallelism, int groupSize, TriConsumer consumer) { - BoundedExecutorService parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {}); + BoundedExecutorService parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, parallelism, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {}); final int CHUNK_SIZE = groupSize; IntWrapper count = new IntWrapper(CHUNK_SIZE); VariableWrapper keys1 = new VariableWrapper<>(new Object[CHUNK_SIZE]); diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java b/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java index 871110b..2f673ff 100644 --- a/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java +++ b/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java @@ -14,64 +14,60 @@ public interface BoundedExecutorService extends ExecutorService { @Deprecated static ExecutorService createUnbounded( + boolean resizable, int corePoolSize, + int maxPoolSize, long keepAliveTime, TimeUnit unit, @Nullable BiConsumer queueSizeStatus) { - return create(0, corePoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueSizeStatus); + return create(0, corePoolSize, maxPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueSizeStatus); } static ExecutorService createUnbounded( + boolean resizable, int corePoolSize, + int maxPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, @Nullable BiConsumer queueSizeStatus) { - var threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, - corePoolSize, - keepAliveTime, - unit, - new LinkedBlockingQueue<>(), - threadFactory - ); - return create(0, corePoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(1000000), queueSizeStatus); + return create(0, corePoolSize, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus); } @Deprecated - static BoundedExecutorService create(int maxQueueSize, + static BoundedExecutorService create( + int maxQueueSize, int corePoolSize, + int maxPoolSize, long keepAliveTime, TimeUnit unit, @Nullable BiConsumer queueSizeStatus) { - return create(maxQueueSize, corePoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueSizeStatus); + return create(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueSizeStatus); } - static BoundedExecutorService create(int maxQueueSize, + static BoundedExecutorService create( + int maxQueueSize, int corePoolSize, + int maxPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, @Nullable BiConsumer queueSizeStatus) { - var threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, - corePoolSize, - keepAliveTime, - unit, - new LinkedBlockingQueue<>(), - threadFactory - ); - return create(maxQueueSize, corePoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(1000000), queueSizeStatus); + return create(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus); } - static BoundedExecutorService create(int maxQueueSize, + static BoundedExecutorService create( + int maxQueueSize, int corePoolSize, + int maxPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, Duration queueItemTtl, @Nullable BiConsumer queueSizeStatus) { var queue = new LinkedBlockingQueue(); - var threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, - corePoolSize, + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, + maxPoolSize, keepAliveTime, unit, queue, diff --git a/src/test/java/org/warp/commonutils/BoundedQueueTest.java b/src/test/java/org/warp/commonutils/BoundedQueueTest.java index 30977da..88a7cbd 100644 --- a/src/test/java/org/warp/commonutils/BoundedQueueTest.java +++ b/src/test/java/org/warp/commonutils/BoundedQueueTest.java @@ -13,11 +13,18 @@ public class BoundedQueueTest { @Test public void testBoundedQueue() throws InterruptedException { + testBoundedQueue(1, 1); + testBoundedQueue(1, 10); + testBoundedQueue(4, 10); + } + + public void testBoundedQueue(int corePoolSize, int maxPoolSize) throws InterruptedException { int maxQueueSize = 2; AtomicInteger queueSize = new AtomicInteger(); AtomicReference failedError = new AtomicReference<>(); var executor = BoundedExecutorService.create(maxQueueSize, - 1, + corePoolSize, + maxPoolSize, 0L, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("test"),