From 72bdf4a309b4280e4f2d7f97e3029c19c96c9c47 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 17 Jul 2020 15:33:09 +0200 Subject: [PATCH] Fixed bounded executor service --- .../warp/commonutils/batch/ParallelUtils.java | 9 +++--- ...ngOnFullQueueExecutorServiceDecorator.java | 10 ++++--- .../executor/BoundedExecutorService.java | 30 +++++++++++++++---- .../warp/commonutils/BoundedQueueTest.java | 2 +- 4 files changed, 37 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/warp/commonutils/batch/ParallelUtils.java b/src/main/java/org/warp/commonutils/batch/ParallelUtils.java index 0770a76..4d63aee 100644 --- a/src/main/java/org/warp/commonutils/batch/ParallelUtils.java +++ b/src/main/java/org/warp/commonutils/batch/ParallelUtils.java @@ -1,6 +1,7 @@ package org.warp.commonutils.batch; import java.util.concurrent.CompletionException; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -33,13 +34,13 @@ public class ParallelUtils { keys.var = new Object[CHUNK_SIZE]; values.var = new Object[CHUNK_SIZE]; try { - parallelExecutor.executeButBlockIfFull(() -> { + parallelExecutor.execute(() -> { for (int i = 0; i < CHUNK_SIZE; i++) { //noinspection unchecked consumer.accept((K) keysCopy[i], (V) valuesCopy[i]); } }); - } catch (InterruptedException e) { + } catch (RejectedExecutionException e) { throw new CompletionException(e); } } @@ -77,13 +78,13 @@ public class ParallelUtils { keys2.var = new Object[CHUNK_SIZE]; values.var = new Object[CHUNK_SIZE]; try { - parallelExecutor.executeButBlockIfFull(() -> { + parallelExecutor.execute(() -> { for (int i = 0; i < CHUNK_SIZE; i++) { //noinspection unchecked consumer.accept((K1) keys1Copy[i], (K2) keys2Copy[i], (V) valuesCopy[i]); } }); - } catch (InterruptedException e) { + } catch (RejectedExecutionException e) { throw new CompletionException(e); } } diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutorServiceDecorator.java b/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutorServiceDecorator.java index d8b0b1d..05d3c3d 100644 --- a/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutorServiceDecorator.java +++ b/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutorServiceDecorator.java @@ -122,8 +122,10 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu public BlockingOnFullQueueExecutorServiceDecorator(@Nonnull final ExecutorService executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout, @Nonnull Supplier queueSizeSupplier, @Nullable BiConsumer queueSizeStatus) { this.delegate = Objects.requireNonNull(executor, "'executor' must not be null"); - if (maximumTaskNumber < 1) { - throw new IllegalArgumentException(String.format("At least one task must be permitted, not '%d'", maximumTaskNumber)); + if (maximumTaskNumber < 0) { + throw new IllegalArgumentException(String.format("At least zero tasks must be permitted, not '%d'", maximumTaskNumber)); + } else if (maximumTaskNumber == 0) { + ignoreTaskLimit = true; } this.timeout = Objects.requireNonNull(maximumTimeout, "'maximumTimeout' must not be null"); if (this.timeout.isNegative()) { @@ -155,7 +157,7 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu } catch (final InterruptedException e) { // restore interrupt status Thread.currentThread().interrupt(); - throw new IllegalStateException(e); + throw new RejectedExecutionException(e); } } } @@ -167,7 +169,7 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu this.delegate.execute(new PermitReleasingRunnableDecorator(command, () -> { var queueSize = queueSizeSupplier.get(); synchronized (queueSizeStatusLock) { - if (queueSizeStatus != null) queueSizeStatus.accept(queueSize >= maximumTaskNumber, queueSize); + if (queueSizeStatus != null) queueSizeStatus.accept(!ignoreTaskLimit && queueSize >= maximumTaskNumber, queueSize); } }, this.taskLimit)); } diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java b/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java index e11b72d..871110b 100644 --- a/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java +++ b/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java @@ -12,6 +12,31 @@ import org.jetbrains.annotations.Nullable; public interface BoundedExecutorService extends ExecutorService { + @Deprecated + static ExecutorService createUnbounded( + int corePoolSize, + long keepAliveTime, + TimeUnit unit, + @Nullable BiConsumer queueSizeStatus) { + return create(0, corePoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueSizeStatus); + } + + static ExecutorService createUnbounded( + int corePoolSize, + long keepAliveTime, + TimeUnit unit, + ThreadFactory threadFactory, + @Nullable BiConsumer queueSizeStatus) { + var threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, + corePoolSize, + keepAliveTime, + unit, + new LinkedBlockingQueue<>(), + threadFactory + ); + return create(0, corePoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(1000000), queueSizeStatus); + } + @Deprecated static BoundedExecutorService create(int maxQueueSize, int corePoolSize, @@ -59,9 +84,4 @@ public interface BoundedExecutorService extends ExecutorService { queueSizeStatus ); } - - @Deprecated - default void executeButBlockIfFull(Runnable task) throws InterruptedException { - this.execute(task); - } } diff --git a/src/test/java/org/warp/commonutils/BoundedQueueTest.java b/src/test/java/org/warp/commonutils/BoundedQueueTest.java index 99fa6a5..30977da 100644 --- a/src/test/java/org/warp/commonutils/BoundedQueueTest.java +++ b/src/test/java/org/warp/commonutils/BoundedQueueTest.java @@ -39,7 +39,7 @@ public class BoundedQueueTest { for (int i = 0; i < 10000; i++) { queueSize.incrementAndGet(); - executor.executeButBlockIfFull(queueSize::decrementAndGet); + executor.execute(queueSize::decrementAndGet); } executor.shutdown();