From 016ac5b140bc89e47e9d001dfd4eed8199bfda36 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 17 Sep 2020 18:00:34 +0200 Subject: [PATCH] Fix queue size --- ...ngOnFullQueueExecutorServiceDecorator.java | 69 +++++++++---------- .../PermitReleasingCallableDecorator.java | 12 ++-- .../PermitReleasingRunnableDecorator.java | 12 ++-- .../executor/QueueSizeUpdater.java | 5 ++ 4 files changed, 53 insertions(+), 45 deletions(-) create mode 100644 src/main/java/org/warp/commonutils/concurrency/executor/QueueSizeUpdater.java diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutorServiceDecorator.java b/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutorServiceDecorator.java index 5eea9f7..9eb0500 100644 --- a/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutorServiceDecorator.java +++ b/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutorServiceDecorator.java @@ -64,13 +64,6 @@ public class BlockingOnFullQueueExecutorServiceDecorator extends ExecutorService this(executor, maximumTaskNumber, maximumTimeout, queueSizeSupplier, null); } - private void updateQueue() { - var queueSize = queueSizeSupplier.get(); - synchronized (queueSizeStatusLock) { - if (queueSizeStatus != null) queueSizeStatus.accept(queueSize >= maximumTaskNumber, queueSize); - } - } - private void preExecute(Object command) { Objects.requireNonNull(command, "'command' must not be null"); @@ -100,12 +93,38 @@ public class BlockingOnFullQueueExecutorServiceDecorator extends ExecutorService public final void execute(final @NotNull Runnable command) { preExecute(command); - super.execute(new PermitReleasingRunnableDecorator(command, () -> { - var queueSize = queueSizeSupplier.get(); - synchronized (queueSizeStatusLock) { - if (queueSizeStatus != null) queueSizeStatus.accept(!ignoreTaskLimit && queueSize >= maximumTaskNumber, queueSize); - } - }, this.taskLimit)); + super.execute(new PermitReleasingRunnableDecorator(command, this::updateQueue, this.taskLimit)); + } + + @NotNull + @Override + public Future submit(@NotNull Callable task) { + preExecute(task); + + return super.submit(new PermitReleasingCallableDecorator<>(task, this::updateQueue, this.taskLimit)); + } + + @NotNull + @Override + public Future submit(@NotNull Runnable task, T result) { + preExecute(task); + + return super.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue, this.taskLimit), result); + } + + @NotNull + @Override + public Future submit(@NotNull Runnable task) { + preExecute(task); + + return super.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue, this.taskLimit)); + } + + private void updateQueue(boolean beforeRunning) { + var queueSize = queueSizeSupplier.get() + (beforeRunning ? 1 : 0); + synchronized (queueSizeStatusLock) { + if (queueSizeStatus != null) queueSizeStatus.accept(!ignoreTaskLimit && queueSize >= maximumTaskNumber, queueSize); + } } @@ -143,30 +162,6 @@ public class BlockingOnFullQueueExecutorServiceDecorator extends ExecutorService return super.awaitTermination(timeout, unit); } - @NotNull - @Override - public Future submit(@NotNull Callable task) { - preExecute(task); - - return super.submit(new PermitReleasingCallableDecorator<>(task, this::updateQueue, this.taskLimit)); - } - - @NotNull - @Override - public Future submit(@NotNull Runnable task, T result) { - preExecute(task); - - return super.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue, this.taskLimit), result); - } - - @NotNull - @Override - public Future submit(@NotNull Runnable task) { - preExecute(task); - - return super.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue, this.taskLimit)); - } - @NotNull @Override public List> invokeAll(@NotNull Collection> tasks) { diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/PermitReleasingCallableDecorator.java b/src/main/java/org/warp/commonutils/concurrency/executor/PermitReleasingCallableDecorator.java index 99dc9f6..8d5eabb 100644 --- a/src/main/java/org/warp/commonutils/concurrency/executor/PermitReleasingCallableDecorator.java +++ b/src/main/java/org/warp/commonutils/concurrency/executor/PermitReleasingCallableDecorator.java @@ -7,13 +7,13 @@ import javax.annotation.Nonnull; public final class PermitReleasingCallableDecorator extends CallableDecorator { @Nonnull - private final Runnable queueSizeUpdater; + private final QueueSizeUpdater queueSizeUpdater; @Nonnull private final Semaphore semaphore; PermitReleasingCallableDecorator(@Nonnull final Callable task, - @Nonnull final Runnable queueSizeUpdater, + @Nonnull final QueueSizeUpdater queueSizeUpdater, @Nonnull final Semaphore semaphoreToRelease) { super(task); this.queueSizeUpdater = queueSizeUpdater; @@ -23,12 +23,16 @@ public final class PermitReleasingCallableDecorator extends CallableDecorator @Override public T call() throws Exception { try { - queueSizeUpdater.run(); + queueSizeUpdater.update(true); } finally { // however execution goes, release permit for next task this.semaphore.release(); - return super.call(); + try { + return super.call(); + } finally { + queueSizeUpdater.update(false); + } } } diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/PermitReleasingRunnableDecorator.java b/src/main/java/org/warp/commonutils/concurrency/executor/PermitReleasingRunnableDecorator.java index ee77a57..fb6b988 100644 --- a/src/main/java/org/warp/commonutils/concurrency/executor/PermitReleasingRunnableDecorator.java +++ b/src/main/java/org/warp/commonutils/concurrency/executor/PermitReleasingRunnableDecorator.java @@ -6,13 +6,13 @@ import javax.annotation.Nonnull; public final class PermitReleasingRunnableDecorator extends RunnableDecorator { @Nonnull - private final Runnable queueSizeUpdater; + private final QueueSizeUpdater queueSizeUpdater; @Nonnull private final Semaphore semaphore; PermitReleasingRunnableDecorator(@Nonnull final Runnable task, - @Nonnull final Runnable queueSizeUpdater, + @Nonnull final QueueSizeUpdater queueSizeUpdater, @Nonnull final Semaphore semaphoreToRelease) { super(task); this.queueSizeUpdater = queueSizeUpdater; @@ -22,12 +22,16 @@ public final class PermitReleasingRunnableDecorator extends RunnableDecorator { @Override public void run() { try { - queueSizeUpdater.run(); + queueSizeUpdater.update(true); } finally { // however execution goes, release permit for next task this.semaphore.release(); - super.run(); + try { + super.run(); + } finally { + queueSizeUpdater.update(false); + } } } diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/QueueSizeUpdater.java b/src/main/java/org/warp/commonutils/concurrency/executor/QueueSizeUpdater.java new file mode 100644 index 0000000..dfa82f5 --- /dev/null +++ b/src/main/java/org/warp/commonutils/concurrency/executor/QueueSizeUpdater.java @@ -0,0 +1,5 @@ +package org.warp.commonutils.concurrency.executor; + +public interface QueueSizeUpdater { + void update(boolean isBeforeRunning); +}