Fix queue size

This commit is contained in:
Andrea Cavalli 2020-09-17 18:00:34 +02:00
parent a1febbc868
commit 016ac5b140
4 changed files with 53 additions and 45 deletions

View File

@ -64,13 +64,6 @@ public class BlockingOnFullQueueExecutorServiceDecorator extends ExecutorService
this(executor, maximumTaskNumber, maximumTimeout, queueSizeSupplier, null);
}
private void updateQueue() {
var queueSize = queueSizeSupplier.get();
synchronized (queueSizeStatusLock) {
if (queueSizeStatus != null) queueSizeStatus.accept(queueSize >= maximumTaskNumber, queueSize);
}
}
private void preExecute(Object command) {
Objects.requireNonNull(command, "'command' must not be null");
@ -100,12 +93,38 @@ public class BlockingOnFullQueueExecutorServiceDecorator extends ExecutorService
public final void execute(final @NotNull Runnable command) {
preExecute(command);
super.execute(new PermitReleasingRunnableDecorator(command, () -> {
var queueSize = queueSizeSupplier.get();
synchronized (queueSizeStatusLock) {
if (queueSizeStatus != null) queueSizeStatus.accept(!ignoreTaskLimit && queueSize >= maximumTaskNumber, queueSize);
}
}, this.taskLimit));
super.execute(new PermitReleasingRunnableDecorator(command, this::updateQueue, this.taskLimit));
}
@NotNull
@Override
public <T> Future<T> submit(@NotNull Callable<T> task) {
preExecute(task);
return super.submit(new PermitReleasingCallableDecorator<>(task, this::updateQueue, this.taskLimit));
}
@NotNull
@Override
public <T> Future<T> submit(@NotNull Runnable task, T result) {
preExecute(task);
return super.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue, this.taskLimit), result);
}
@NotNull
@Override
public Future<?> submit(@NotNull Runnable task) {
preExecute(task);
return super.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue, this.taskLimit));
}
private void updateQueue(boolean beforeRunning) {
var queueSize = queueSizeSupplier.get() + (beforeRunning ? 1 : 0);
synchronized (queueSizeStatusLock) {
if (queueSizeStatus != null) queueSizeStatus.accept(!ignoreTaskLimit && queueSize >= maximumTaskNumber, queueSize);
}
}
@ -143,30 +162,6 @@ public class BlockingOnFullQueueExecutorServiceDecorator extends ExecutorService
return super.awaitTermination(timeout, unit);
}
@NotNull
@Override
public <T> Future<T> submit(@NotNull Callable<T> task) {
preExecute(task);
return super.submit(new PermitReleasingCallableDecorator<>(task, this::updateQueue, this.taskLimit));
}
@NotNull
@Override
public <T> Future<T> submit(@NotNull Runnable task, T result) {
preExecute(task);
return super.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue, this.taskLimit), result);
}
@NotNull
@Override
public Future<?> submit(@NotNull Runnable task) {
preExecute(task);
return super.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue, this.taskLimit));
}
@NotNull
@Override
public <T> List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> tasks) {

View File

@ -7,13 +7,13 @@ import javax.annotation.Nonnull;
public final class PermitReleasingCallableDecorator<T> extends CallableDecorator<T> {
@Nonnull
private final Runnable queueSizeUpdater;
private final QueueSizeUpdater queueSizeUpdater;
@Nonnull
private final Semaphore semaphore;
PermitReleasingCallableDecorator(@Nonnull final Callable<T> task,
@Nonnull final Runnable queueSizeUpdater,
@Nonnull final QueueSizeUpdater queueSizeUpdater,
@Nonnull final Semaphore semaphoreToRelease) {
super(task);
this.queueSizeUpdater = queueSizeUpdater;
@ -23,12 +23,16 @@ public final class PermitReleasingCallableDecorator<T> extends CallableDecorator
@Override
public T call() throws Exception {
try {
queueSizeUpdater.run();
queueSizeUpdater.update(true);
} finally {
// however execution goes, release permit for next task
this.semaphore.release();
return super.call();
try {
return super.call();
} finally {
queueSizeUpdater.update(false);
}
}
}

View File

@ -6,13 +6,13 @@ import javax.annotation.Nonnull;
public final class PermitReleasingRunnableDecorator extends RunnableDecorator {
@Nonnull
private final Runnable queueSizeUpdater;
private final QueueSizeUpdater queueSizeUpdater;
@Nonnull
private final Semaphore semaphore;
PermitReleasingRunnableDecorator(@Nonnull final Runnable task,
@Nonnull final Runnable queueSizeUpdater,
@Nonnull final QueueSizeUpdater queueSizeUpdater,
@Nonnull final Semaphore semaphoreToRelease) {
super(task);
this.queueSizeUpdater = queueSizeUpdater;
@ -22,12 +22,16 @@ public final class PermitReleasingRunnableDecorator extends RunnableDecorator {
@Override
public void run() {
try {
queueSizeUpdater.run();
queueSizeUpdater.update(true);
} finally {
// however execution goes, release permit for next task
this.semaphore.release();
super.run();
try {
super.run();
} finally {
queueSizeUpdater.update(false);
}
}
}

View File

@ -0,0 +1,5 @@
package org.warp.commonutils.concurrency.executor;
public interface QueueSizeUpdater {
void update(boolean isBeforeRunning);
}