diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutorServiceDecorator.java b/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutorServiceDecorator.java new file mode 100644 index 0000000..008cfcf --- /dev/null +++ b/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutorServiceDecorator.java @@ -0,0 +1,207 @@ +package org.warp.commonutils.concurrency.executor; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.Nonnull; +import org.jetbrains.annotations.NotNull; + +public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecutorService { + + private static final class PermitReleasingRunnableDecorator implements Runnable { + + @Nonnull + private final Runnable delegate; + + @Nonnull + private final Semaphore semaphore; + + private PermitReleasingRunnableDecorator(@Nonnull final Runnable task, @Nonnull final Semaphore semaphoreToRelease) { + this.delegate = task; + this.semaphore = semaphoreToRelease; + } + + @Override + public void run() { + try { + this.delegate.run(); + } + finally { + // however execution goes, release permit for next task + this.semaphore.release(); + } + } + + @Override + public final String toString() { + return String.format("%s[delegate='%s']", getClass().getSimpleName(), this.delegate); + } + } + + private static final class PermitReleasingCallableDecorator implements Callable { + + @Nonnull + private final Callable delegate; + + @Nonnull + private final Semaphore semaphore; + + private PermitReleasingCallableDecorator(@Nonnull final Callable task, @Nonnull final Semaphore semaphoreToRelease) { + this.delegate = task; + this.semaphore = semaphoreToRelease; + } + + @Override + public T call() throws Exception { + try { + return this.delegate.call(); + } finally { + // however execution goes, release permit for next task + this.semaphore.release(); + } + } + + @Override + public final String toString() { + return String.format("%s[delegate='%s']", getClass().getSimpleName(), this.delegate); + } + } + + @Nonnull + private final Semaphore taskLimit; + + @Nonnull + private final Duration timeout; + + @Nonnull + private final ExecutorService delegate; + + public BlockingOnFullQueueExecutorServiceDecorator(@Nonnull final ExecutorService executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout) { + this.delegate = Objects.requireNonNull(executor, "'executor' must not be null"); + if (maximumTaskNumber < 1) { + throw new IllegalArgumentException(String.format("At least one task must be permitted, not '%d'", maximumTaskNumber)); + } + this.timeout = Objects.requireNonNull(maximumTimeout, "'maximumTimeout' must not be null"); + if (this.timeout.isNegative()) { + throw new IllegalArgumentException("'maximumTimeout' must not be negative"); + } + this.taskLimit = new Semaphore(maximumTaskNumber); + } + + private void preExecute(Object command) { + Objects.requireNonNull(command, "'command' must not be null"); + try { + // attempt to acquire permit for task execution + if (!this.taskLimit.tryAcquire(this.timeout.toMillis(), MILLISECONDS)) { + throw new RejectedExecutionException(String.format("Executor '%s' busy", this.delegate)); + } + } + catch (final InterruptedException e) { + // restore interrupt status + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + + @Override + public final void execute(final Runnable command) { + preExecute(command); + + this.delegate.execute(new PermitReleasingRunnableDecorator(command, this.taskLimit)); + } + + + @Override + public void shutdown() { + this.delegate.shutdown(); + } + + @NotNull + @Override + public List shutdownNow() { + return this.delegate.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return this.delegate.isShutdown(); + } + + @Override + public boolean isTerminated() { + return this.delegate.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) throws InterruptedException { + return this.delegate.awaitTermination(timeout, unit); + } + + @NotNull + @Override + public Future submit(@NotNull Callable task) { + preExecute(task); + + return this.delegate.submit(new PermitReleasingCallableDecorator<>(task, this.taskLimit)); + } + + @NotNull + @Override + public Future submit(@NotNull Runnable task, T result) { + preExecute(task); + + return this.delegate.submit(new PermitReleasingRunnableDecorator(task, this.taskLimit), result); + } + + @NotNull + @Override + public Future submit(@NotNull Runnable task) { + preExecute(task); + + return this.delegate.submit(new PermitReleasingRunnableDecorator(task, this.taskLimit)); + } + + @NotNull + @Override + public List> invokeAll(@NotNull Collection> tasks) throws InterruptedException { + throw new UnsupportedOperationException("invokeAll(tasks) is not supported"); + } + + @NotNull + @Override + public List> invokeAll(@NotNull Collection> tasks, + long timeout, + @NotNull TimeUnit unit) throws InterruptedException { + throw new UnsupportedOperationException("invokeAll(tasks, timeout, unit) is not supported"); + } + + @NotNull + @Override + public T invokeAny(@NotNull Collection> tasks) + throws InterruptedException, ExecutionException { + throw new UnsupportedOperationException("invokeAny(tasks) is not supported"); + } + + @Override + public T invokeAny(@NotNull Collection> tasks, long timeout, @NotNull TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + throw new UnsupportedOperationException("invokeAny(tasks, timeout, unit) is not supported"); + } + + @Override + public final String toString() { + return String.format("%s[availablePermits='%s',timeout='%s',delegate='%s']", getClass().getSimpleName(), this.taskLimit.availablePermits(), + this.timeout, this.delegate); + } +} \ No newline at end of file diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java b/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java index 80fcdf6..546a710 100644 --- a/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java +++ b/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorService.java @@ -1,10 +1,11 @@ package org.warp.commonutils.concurrency.executor; -import java.util.concurrent.Callable; +import java.time.Duration; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import org.jetbrains.annotations.Nullable; @@ -18,9 +19,9 @@ public interface BoundedExecutorService extends ExecutorService { long keepAliveTime, TimeUnit unit, @Nullable BiConsumer queueSizeStatus) { - return new BoundedExecutorServiceImpl(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, - Executors.defaultThreadFactory(), queueSizeStatus); + return create(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueSizeStatus); } + static BoundedExecutorService create(int maxQueueSize, int corePoolSize, int maxPoolSize, @@ -28,10 +29,36 @@ public interface BoundedExecutorService extends ExecutorService { TimeUnit unit, ThreadFactory threadFactory, @Nullable BiConsumer queueSizeStatus) { - return new BoundedExecutorServiceImpl(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, threadFactory, queueSizeStatus); + var threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, + maxPoolSize, + keepAliveTime, + unit, + new LinkedBlockingQueue<>(), + threadFactory + ); + return new BlockingOnFullQueueExecutorServiceDecorator(threadPoolExecutor, maxPoolSize, Duration.ofDays(1000000)); } - Future submitButBlockIfFull(Callable task) throws InterruptedException; + static BoundedExecutorService create(int maxQueueSize, + int corePoolSize, + int maxPoolSize, + long keepAliveTime, + TimeUnit unit, + ThreadFactory threadFactory, + Duration queueItemTtl, + @Nullable BiConsumer queueSizeStatus) { + var threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, + maxPoolSize, + keepAliveTime, + unit, + new LinkedBlockingQueue<>(), + threadFactory + ); + return new BlockingOnFullQueueExecutorServiceDecorator(threadPoolExecutor, maxPoolSize, queueItemTtl); + } - void executeButBlockIfFull(Runnable task) throws InterruptedException; + @Deprecated + default void executeButBlockIfFull(Runnable task) throws InterruptedException { + this.execute(task); + } } diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorServiceImpl.java b/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorServiceImpl.java deleted file mode 100644 index 10e10cb..0000000 --- a/src/main/java/org/warp/commonutils/concurrency/executor/BoundedExecutorServiceImpl.java +++ /dev/null @@ -1,115 +0,0 @@ -package org.warp.commonutils.concurrency.executor; - -import java.util.concurrent.Callable; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.Semaphore; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; -import org.jetbrains.annotations.Nullable; - -class BoundedExecutorServiceImpl extends ThreadPoolExecutor implements BoundedExecutorService { - - private final Semaphore semaphore; - private final @Nullable BiConsumer queueSizeStatus; - private final int maxQueueSize; - private final Object queueSizeStatusLock = new Object(); - - /** - * - * @param maxQueueSize - * @param corePoolSize - * @param maxPoolSize - * @param keepAliveTime - * @param unit - * @param queueSizeStatus Status. The boolean indicates if the queue is full, the integer indicates the current queue size - */ - public BoundedExecutorServiceImpl(int maxQueueSize, - int corePoolSize, - int maxPoolSize, - long keepAliveTime, - TimeUnit unit, - ThreadFactory threadFactory, - @Nullable BiConsumer queueSizeStatus) { - super(corePoolSize, maxPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<>(), threadFactory); - if (maxQueueSize < 0) { - throw new IllegalArgumentException(); - } - this.maxQueueSize = maxQueueSize; - this.queueSizeStatus = queueSizeStatus; - semaphore = new Semaphore(maxQueueSize); - } - - /** - * Submits task to execution pool, but blocks while number of running threads - * has reached the bound limit - */ - @Override - public Future submitButBlockIfFull(final Callable task) throws InterruptedException { - return this.submit(task); - } - - @Override - public Future submit(Runnable task) { - preChecks(); - return super.submit(task); - } - - @Override - public Future submit(Callable task) { - preChecks(); - return super.submit(task); - } - - @Override - public Future submit(Runnable task, T result) { - preChecks(); - return super.submit(task, result); - } - - /** - * Submits task to execution pool, but blocks while number of running threads - * has reached the bound limit - */ - @Override - public void executeButBlockIfFull(final Runnable task) throws InterruptedException { - this.execute(task); - } - - @Override - public void execute(Runnable command) { - preChecks(); - super.execute(command); - } - - private void preChecks() { - try { - blockIfFull(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - @Override - protected void beforeExecute(Thread t, Runnable r) { - var queueSize = getQueue().size(); - synchronized (queueSizeStatusLock) { - if (queueSizeStatus != null) queueSizeStatus.accept(queueSize >= maxQueueSize, queueSize); - } - - semaphore.release(); - - super.beforeExecute(t, r); - } - - private void blockIfFull() throws InterruptedException { - if (semaphore.availablePermits() == 0) { - synchronized (queueSizeStatusLock) { - if (queueSizeStatus != null) queueSizeStatus.accept(true, maxQueueSize + (semaphore.hasQueuedThreads() ? semaphore.getQueueLength() : 0)); - } - } - semaphore.acquire(); - } -} \ No newline at end of file