diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutorServiceDecorator.java b/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutorServiceDecorator.java index ef0b31c..d8b0b1d 100644 --- a/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutorServiceDecorator.java +++ b/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutorServiceDecorator.java @@ -22,27 +22,39 @@ import org.jetbrains.annotations.Nullable; public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecutorService { + private final void updateQueue() { + var queueSize = queueSizeSupplier.get(); + synchronized (queueSizeStatusLock) { + if (queueSizeStatus != null) queueSizeStatus.accept(queueSize >= maximumTaskNumber, queueSize); + } + }; + private static final class PermitReleasingRunnableDecorator implements Runnable { @Nonnull private final Runnable delegate; + @Nonnull + private final Runnable queueSizeUpdater; + @Nonnull private final Semaphore semaphore; - private PermitReleasingRunnableDecorator(@Nonnull final Runnable task, @Nonnull final Semaphore semaphoreToRelease) { + private PermitReleasingRunnableDecorator(@Nonnull final Runnable task, @Nonnull final Runnable queueSizeUpdater, @Nonnull final Semaphore semaphoreToRelease) { this.delegate = task; + this.queueSizeUpdater = queueSizeUpdater; this.semaphore = semaphoreToRelease; } @Override public void run() { try { - this.delegate.run(); - } - finally { + queueSizeUpdater.run(); + } finally { // however execution goes, release permit for next task this.semaphore.release(); + + this.delegate.run(); } } @@ -57,21 +69,27 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu @Nonnull private final Callable delegate; + @Nonnull + private final Runnable queueSizeUpdater; + @Nonnull private final Semaphore semaphore; - private PermitReleasingCallableDecorator(@Nonnull final Callable task, @Nonnull final Semaphore semaphoreToRelease) { + private PermitReleasingCallableDecorator(@Nonnull final Callable task, @Nonnull final Runnable queueSizeUpdater, @Nonnull final Semaphore semaphoreToRelease) { this.delegate = task; + this.queueSizeUpdater = queueSizeUpdater; this.semaphore = semaphoreToRelease; } @Override public T call() throws Exception { try { - return this.delegate.call(); + queueSizeUpdater.run(); } finally { // however execution goes, release permit for next task this.semaphore.release(); + + return this.delegate.call(); } } @@ -120,12 +138,6 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu private void preExecute(Object command) { Objects.requireNonNull(command, "'command' must not be null"); - - var queueSize = queueSizeSupplier.get(); - synchronized (queueSizeStatusLock) { - if (queueSizeStatus != null) queueSizeStatus.accept(queueSize >= maximumTaskNumber, queueSize); - } - if (!ignoreTaskLimit) { try { if (this.taskLimit.availablePermits() == 0) { @@ -152,7 +164,12 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu public final void execute(final Runnable command) { preExecute(command); - this.delegate.execute(new PermitReleasingRunnableDecorator(command, this.taskLimit)); + this.delegate.execute(new PermitReleasingRunnableDecorator(command, () -> { + var queueSize = queueSizeSupplier.get(); + synchronized (queueSizeStatusLock) { + if (queueSizeStatus != null) queueSizeStatus.accept(queueSize >= maximumTaskNumber, queueSize); + } + }, this.taskLimit)); } @@ -195,7 +212,7 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu public Future submit(@NotNull Callable task) { preExecute(task); - return this.delegate.submit(new PermitReleasingCallableDecorator<>(task, this.taskLimit)); + return this.delegate.submit(new PermitReleasingCallableDecorator<>(task, this::updateQueue, this.taskLimit)); } @NotNull @@ -203,7 +220,7 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu public Future submit(@NotNull Runnable task, T result) { preExecute(task); - return this.delegate.submit(new PermitReleasingRunnableDecorator(task, this.taskLimit), result); + return this.delegate.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue, this.taskLimit), result); } @NotNull @@ -211,7 +228,7 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu public Future submit(@NotNull Runnable task) { preExecute(task); - return this.delegate.submit(new PermitReleasingRunnableDecorator(task, this.taskLimit)); + return this.delegate.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue, this.taskLimit)); } @NotNull