Update BlockingOnFullQueueExecutorServiceDecorator.java

This commit is contained in:
Andrea Cavalli 2020-07-14 01:00:38 +02:00
parent 152c965200
commit 453a48e3a8
1 changed files with 33 additions and 16 deletions

View File

@ -22,27 +22,39 @@ import org.jetbrains.annotations.Nullable;
public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecutorService {
private final void updateQueue() {
var queueSize = queueSizeSupplier.get();
synchronized (queueSizeStatusLock) {
if (queueSizeStatus != null) queueSizeStatus.accept(queueSize >= maximumTaskNumber, queueSize);
}
};
private static final class PermitReleasingRunnableDecorator implements Runnable {
@Nonnull
private final Runnable delegate;
@Nonnull
private final Runnable queueSizeUpdater;
@Nonnull
private final Semaphore semaphore;
private PermitReleasingRunnableDecorator(@Nonnull final Runnable task, @Nonnull final Semaphore semaphoreToRelease) {
private PermitReleasingRunnableDecorator(@Nonnull final Runnable task, @Nonnull final Runnable queueSizeUpdater, @Nonnull final Semaphore semaphoreToRelease) {
this.delegate = task;
this.queueSizeUpdater = queueSizeUpdater;
this.semaphore = semaphoreToRelease;
}
@Override
public void run() {
try {
this.delegate.run();
}
finally {
queueSizeUpdater.run();
} finally {
// however execution goes, release permit for next task
this.semaphore.release();
this.delegate.run();
}
}
@ -57,21 +69,27 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu
@Nonnull
private final Callable<T> delegate;
@Nonnull
private final Runnable queueSizeUpdater;
@Nonnull
private final Semaphore semaphore;
private PermitReleasingCallableDecorator(@Nonnull final Callable<T> task, @Nonnull final Semaphore semaphoreToRelease) {
private PermitReleasingCallableDecorator(@Nonnull final Callable<T> task, @Nonnull final Runnable queueSizeUpdater, @Nonnull final Semaphore semaphoreToRelease) {
this.delegate = task;
this.queueSizeUpdater = queueSizeUpdater;
this.semaphore = semaphoreToRelease;
}
@Override
public T call() throws Exception {
try {
return this.delegate.call();
queueSizeUpdater.run();
} finally {
// however execution goes, release permit for next task
this.semaphore.release();
return this.delegate.call();
}
}
@ -120,12 +138,6 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu
private void preExecute(Object command) {
Objects.requireNonNull(command, "'command' must not be null");
var queueSize = queueSizeSupplier.get();
synchronized (queueSizeStatusLock) {
if (queueSizeStatus != null) queueSizeStatus.accept(queueSize >= maximumTaskNumber, queueSize);
}
if (!ignoreTaskLimit) {
try {
if (this.taskLimit.availablePermits() == 0) {
@ -152,7 +164,12 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu
public final void execute(final Runnable command) {
preExecute(command);
this.delegate.execute(new PermitReleasingRunnableDecorator(command, this.taskLimit));
this.delegate.execute(new PermitReleasingRunnableDecorator(command, () -> {
var queueSize = queueSizeSupplier.get();
synchronized (queueSizeStatusLock) {
if (queueSizeStatus != null) queueSizeStatus.accept(queueSize >= maximumTaskNumber, queueSize);
}
}, this.taskLimit));
}
@ -195,7 +212,7 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu
public <T> Future<T> submit(@NotNull Callable<T> task) {
preExecute(task);
return this.delegate.submit(new PermitReleasingCallableDecorator<>(task, this.taskLimit));
return this.delegate.submit(new PermitReleasingCallableDecorator<>(task, this::updateQueue, this.taskLimit));
}
@NotNull
@ -203,7 +220,7 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu
public <T> Future<T> submit(@NotNull Runnable task, T result) {
preExecute(task);
return this.delegate.submit(new PermitReleasingRunnableDecorator(task, this.taskLimit), result);
return this.delegate.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue, this.taskLimit), result);
}
@NotNull
@ -211,7 +228,7 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu
public Future<?> submit(@NotNull Runnable task) {
preExecute(task);
return this.delegate.submit(new PermitReleasingRunnableDecorator(task, this.taskLimit));
return this.delegate.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue, this.taskLimit));
}
@NotNull