package org.warp.commonutils.concurrency.executor; import static java.util.concurrent.TimeUnit.MILLISECONDS; import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.StampedLock; import java.util.function.BiConsumer; import java.util.function.Supplier; import javax.annotation.Nonnull; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; public class BlockingOnFullQueueExecutorServiceDecorator extends ExecutorServiceDecorator { private volatile boolean ignoreTaskLimit; private final StampedLock drainAllLock = new StampedLock(); @Nonnull private final Semaphore taskLimit; @Nonnull private final Duration timeout; private final int maximumTaskNumber; @Nonnull private final Supplier queueSizeSupplier; private final @Nullable BiConsumer queueSizeStatus; @Nonnull private final Object queueSizeStatusLock; public BlockingOnFullQueueExecutorServiceDecorator(@Nonnull final ExecutorService executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout, @Nonnull Supplier queueSizeSupplier, @Nullable BiConsumer queueSizeStatus) { super(executor); ExecutorServiceDecorator.hasDecorator(executor, this.getClass()); if (maximumTaskNumber < 0) { throw new IllegalArgumentException(String.format("At least zero tasks must be permitted, not '%d'", maximumTaskNumber)); } else if (maximumTaskNumber == 0) { ignoreTaskLimit = true; } this.timeout = Objects.requireNonNull(maximumTimeout, "'maximumTimeout' must not be null"); if (this.timeout.isNegative()) { throw new IllegalArgumentException("'maximumTimeout' must not be negative"); } this.maximumTaskNumber = maximumTaskNumber; this.queueSizeSupplier = queueSizeSupplier; this.queueSizeStatus = queueSizeStatus; this.queueSizeStatusLock = new Object(); this.taskLimit = new Semaphore(maximumTaskNumber); } public BlockingOnFullQueueExecutorServiceDecorator(@Nonnull final ExecutorService executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout, @Nonnull Supplier queueSizeSupplier) { this(executor, maximumTaskNumber, maximumTimeout, queueSizeSupplier, null); } private void updateQueue() { var queueSize = queueSizeSupplier.get(); synchronized (queueSizeStatusLock) { if (queueSizeStatus != null) queueSizeStatus.accept(queueSize >= maximumTaskNumber, queueSize); } } private void preExecute(Object command) { Objects.requireNonNull(command, "'command' must not be null"); if (!ignoreTaskLimit) { try { if (this.taskLimit.availablePermits() == 0) { synchronized (queueSizeStatusLock) { if (queueSizeStatus != null) queueSizeStatus.accept(true, maximumTaskNumber + (taskLimit.hasQueuedThreads() ? taskLimit.getQueueLength() : 0) ); } } // attempt to acquire permit for task execution if (!this.taskLimit.tryAcquire(this.timeout.toMillis(), MILLISECONDS)) { throw new RejectedExecutionException(String.format("Executor '%s' busy", super.toString())); } } catch (final InterruptedException e) { // restore interrupt status Thread.currentThread().interrupt(); throw new RejectedExecutionException(e); } } } @Override public final void execute(final @NotNull Runnable command) { preExecute(command); super.execute(new PermitReleasingRunnableDecorator(command, () -> { var queueSize = queueSizeSupplier.get(); synchronized (queueSizeStatusLock) { if (queueSizeStatus != null) queueSizeStatus.accept(!ignoreTaskLimit && queueSize >= maximumTaskNumber, queueSize); } }, this.taskLimit)); } @Override public void shutdown() { this.ignoreTaskLimit = true; while (this.taskLimit.hasQueuedThreads()) { this.taskLimit.release(10); } super.shutdown(); } @NotNull @Override public List shutdownNow() { this.ignoreTaskLimit = true; while (this.taskLimit.hasQueuedThreads()) { this.taskLimit.release(10); } return super.shutdownNow(); } @Override public boolean isShutdown() { return super.isShutdown(); } @Override public boolean isTerminated() { return super.isTerminated(); } @Override public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) throws InterruptedException { return super.awaitTermination(timeout, unit); } @NotNull @Override public Future submit(@NotNull Callable task) { preExecute(task); return super.submit(new PermitReleasingCallableDecorator<>(task, this::updateQueue, this.taskLimit)); } @NotNull @Override public Future submit(@NotNull Runnable task, T result) { preExecute(task); return super.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue, this.taskLimit), result); } @NotNull @Override public Future submit(@NotNull Runnable task) { preExecute(task); return super.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue, this.taskLimit)); } @NotNull @Override public List> invokeAll(@NotNull Collection> tasks) { throw new UnsupportedOperationException("invokeAll(tasks) is not supported"); } @NotNull @Override public List> invokeAll(@NotNull Collection> tasks, long timeout, @NotNull TimeUnit unit) { throw new UnsupportedOperationException("invokeAll(tasks, timeout, unit) is not supported"); } @NotNull @Override public T invokeAny(@NotNull Collection> tasks) { throw new UnsupportedOperationException("invokeAny(tasks) is not supported"); } @Override public T invokeAny(@NotNull Collection> tasks, long timeout, @NotNull TimeUnit unit) { throw new UnsupportedOperationException("invokeAny(tasks, timeout, unit) is not supported"); } @Override public final String toString() { return String.format("%s[availablePermits='%s',timeout='%s',delegate='%s']", getClass().getSimpleName(), this.taskLimit.availablePermits(), this.timeout, super.toString()); } }