From 152c96520036f3942d3d060017c0ad3e4a196075 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 11 Jul 2020 16:21:54 +0200 Subject: [PATCH] Update ParallelUtils.java, BlockingOnFullQueueExecutorServiceDecorator.java, and 2 more files... --- .../warp/commonutils/batch/ParallelUtils.java | 4 +- ...ngOnFullQueueExecutorServiceDecorator.java | 62 ++++++++++++++++--- .../executor/BoundedExecutorService.java | 21 ++++--- .../warp/commonutils/BoundedQueueTest.java | 1 - 4 files changed, 66 insertions(+), 22 deletions(-) diff --git a/src/main/java/org/warp/commonutils/batch/ParallelUtils.java b/src/main/java/org/warp/commonutils/batch/ParallelUtils.java index 033e022..0770a76 100644 --- a/src/main/java/org/warp/commonutils/batch/ParallelUtils.java +++ b/src/main/java/org/warp/commonutils/batch/ParallelUtils.java @@ -17,7 +17,7 @@ public class ParallelUtils { int parallelism, int groupSize, BiConsumer consumer) { - BoundedExecutorService parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, parallelism * 2, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {}); + BoundedExecutorService parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {}); final int CHUNK_SIZE = groupSize; IntWrapper count = new IntWrapper(CHUNK_SIZE); VariableWrapper keys = new VariableWrapper<>(new Object[CHUNK_SIZE]); @@ -57,7 +57,7 @@ public class ParallelUtils { int parallelism, int groupSize, TriConsumer consumer) { - BoundedExecutorService parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, parallelism * 2, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {}); + BoundedExecutorService parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {}); final int CHUNK_SIZE = groupSize; IntWrapper count = new IntWrapper(CHUNK_SIZE); VariableWrapper keys1 = new VariableWrapper<>(new Object[CHUNK_SIZE]); diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutorServiceDecorator.java b/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutorServiceDecorator.java index 008cfcf..ef0b31c 100644 --- a/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutorServiceDecorator.java +++ b/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutorServiceDecorator.java @@ -14,8 +14,11 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.BiConsumer; +import java.util.function.Supplier; import javax.annotation.Nonnull; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecutorService { @@ -78,16 +81,28 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu } } + private volatile boolean ignoreTaskLimit; + @Nonnull private final Semaphore taskLimit; @Nonnull private final Duration timeout; + private final int maximumTaskNumber; + + @Nonnull + private final Supplier queueSizeSupplier; + + private final @Nullable BiConsumer queueSizeStatus; + + @Nonnull + private final Object queueSizeStatusLock; + @Nonnull private final ExecutorService delegate; - public BlockingOnFullQueueExecutorServiceDecorator(@Nonnull final ExecutorService executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout) { + public BlockingOnFullQueueExecutorServiceDecorator(@Nonnull final ExecutorService executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout, @Nonnull Supplier queueSizeSupplier, @Nullable BiConsumer queueSizeStatus) { this.delegate = Objects.requireNonNull(executor, "'executor' must not be null"); if (maximumTaskNumber < 1) { throw new IllegalArgumentException(String.format("At least one task must be permitted, not '%d'", maximumTaskNumber)); @@ -96,21 +111,40 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu if (this.timeout.isNegative()) { throw new IllegalArgumentException("'maximumTimeout' must not be negative"); } + this.maximumTaskNumber = maximumTaskNumber; + this.queueSizeSupplier = queueSizeSupplier; + this.queueSizeStatus = queueSizeStatus; + this.queueSizeStatusLock = new Object(); this.taskLimit = new Semaphore(maximumTaskNumber); } private void preExecute(Object command) { Objects.requireNonNull(command, "'command' must not be null"); - try { - // attempt to acquire permit for task execution - if (!this.taskLimit.tryAcquire(this.timeout.toMillis(), MILLISECONDS)) { - throw new RejectedExecutionException(String.format("Executor '%s' busy", this.delegate)); - } + + var queueSize = queueSizeSupplier.get(); + synchronized (queueSizeStatusLock) { + if (queueSizeStatus != null) queueSizeStatus.accept(queueSize >= maximumTaskNumber, queueSize); } - catch (final InterruptedException e) { - // restore interrupt status - Thread.currentThread().interrupt(); - throw new IllegalStateException(e); + + if (!ignoreTaskLimit) { + try { + if (this.taskLimit.availablePermits() == 0) { + synchronized (queueSizeStatusLock) { + if (queueSizeStatus != null) + queueSizeStatus.accept(true, + maximumTaskNumber + (taskLimit.hasQueuedThreads() ? taskLimit.getQueueLength() : 0) + ); + } + } + // attempt to acquire permit for task execution + if (!this.taskLimit.tryAcquire(this.timeout.toMillis(), MILLISECONDS)) { + throw new RejectedExecutionException(String.format("Executor '%s' busy", this.delegate)); + } + } catch (final InterruptedException e) { + // restore interrupt status + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } } } @@ -124,12 +158,20 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu @Override public void shutdown() { + this.ignoreTaskLimit = true; + while (this.taskLimit.hasQueuedThreads()) { + this.taskLimit.release(10); + } this.delegate.shutdown(); } @NotNull @Override public List shutdownNow() { + this.ignoreTaskLimit = true; + while (this.taskLimit.hasQueuedThreads()) { + this.taskLimit.release(10); + } return this.delegate.shutdownNow(); } diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java b/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java index 546a710..e11b72d 100644 --- a/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java +++ b/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java @@ -15,46 +15,49 @@ public interface BoundedExecutorService extends ExecutorService { @Deprecated static BoundedExecutorService create(int maxQueueSize, int corePoolSize, - int maxPoolSize, long keepAliveTime, TimeUnit unit, @Nullable BiConsumer queueSizeStatus) { - return create(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueSizeStatus); + return create(maxQueueSize, corePoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueSizeStatus); } static BoundedExecutorService create(int maxQueueSize, int corePoolSize, - int maxPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, @Nullable BiConsumer queueSizeStatus) { var threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, - maxPoolSize, + corePoolSize, keepAliveTime, unit, new LinkedBlockingQueue<>(), threadFactory ); - return new BlockingOnFullQueueExecutorServiceDecorator(threadPoolExecutor, maxPoolSize, Duration.ofDays(1000000)); + return create(maxQueueSize, corePoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(1000000), queueSizeStatus); } static BoundedExecutorService create(int maxQueueSize, int corePoolSize, - int maxPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, Duration queueItemTtl, @Nullable BiConsumer queueSizeStatus) { + var queue = new LinkedBlockingQueue(); var threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, - maxPoolSize, + corePoolSize, keepAliveTime, unit, - new LinkedBlockingQueue<>(), + queue, threadFactory ); - return new BlockingOnFullQueueExecutorServiceDecorator(threadPoolExecutor, maxPoolSize, queueItemTtl); + return new BlockingOnFullQueueExecutorServiceDecorator(threadPoolExecutor, + maxQueueSize, + queueItemTtl, + queue::size, + queueSizeStatus + ); } @Deprecated diff --git a/src/test/java/org/warp/commonutils/BoundedQueueTest.java b/src/test/java/org/warp/commonutils/BoundedQueueTest.java index 6ca49a6..99fa6a5 100644 --- a/src/test/java/org/warp/commonutils/BoundedQueueTest.java +++ b/src/test/java/org/warp/commonutils/BoundedQueueTest.java @@ -17,7 +17,6 @@ public class BoundedQueueTest { AtomicInteger queueSize = new AtomicInteger(); AtomicReference failedError = new AtomicReference<>(); var executor = BoundedExecutorService.create(maxQueueSize, - 1, 1, 0L, TimeUnit.MILLISECONDS,