common-utils/src/main/java/org/warp/commonutils/concurrency/executor/BlockingOnFullQueueExecutor...

155 lines
4.6 KiB
Java

package org.warp.commonutils.concurrency.executor;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
public class BlockingOnFullQueueExecutorServiceDecorator extends ExecutorServiceDecorator {
private volatile boolean ignoreTaskLimit;
@NotNull
private final Duration timeout;
private final int maximumTaskNumber;
@NotNull
private final Supplier<Integer> queueSizeSupplier;
private final @Nullable BiConsumer<Boolean, Integer> queueSizeStatus;
public BlockingOnFullQueueExecutorServiceDecorator(@NotNull final ExecutorService executor, final int maximumTaskNumber, @NotNull final Duration maximumTimeout, @NotNull Supplier<Integer> queueSizeSupplier, @Nullable BiConsumer<Boolean, Integer> queueSizeStatus) {
super(executor);
if (maximumTaskNumber < 0) {
throw new IllegalArgumentException(String.format("At least zero tasks must be permitted, not '%d'", maximumTaskNumber));
} else if (maximumTaskNumber == 0) {
ignoreTaskLimit = true;
}
this.timeout = Objects.requireNonNull(maximumTimeout, "'maximumTimeout' must not be null");
if (this.timeout.isNegative()) {
throw new IllegalArgumentException("'maximumTimeout' must not be negative");
}
this.maximumTaskNumber = maximumTaskNumber;
this.queueSizeSupplier = queueSizeSupplier;
this.queueSizeStatus = queueSizeStatus;
}
public BlockingOnFullQueueExecutorServiceDecorator(@NotNull final ExecutorService executor, final int maximumTaskNumber, @NotNull final Duration maximumTimeout, @NotNull Supplier<Integer> queueSizeSupplier) {
this(executor, maximumTaskNumber, maximumTimeout, queueSizeSupplier, null);
}
private void preExecute(Object command) {
Objects.requireNonNull(command, "'command' must not be null");
}
@Override
public final void execute(final @NotNull Runnable command) {
preExecute(command);
super.execute(new PermitReleasingRunnableDecorator(command, this::updateQueue));
}
@NotNull
@Override
public <T> Future<T> submit(@NotNull Callable<T> task) {
preExecute(task);
return super.submit(new PermitReleasingCallableDecorator<>(task, this::updateQueue));
}
@NotNull
@Override
public <T> Future<T> submit(@NotNull Runnable task, T result) {
preExecute(task);
return super.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue), result);
}
@NotNull
@Override
public Future<?> submit(@NotNull Runnable task) {
preExecute(task);
return super.submit(new PermitReleasingRunnableDecorator(task, this::updateQueue));
}
private void updateQueue(boolean beforeRunning) {
var queueSize = queueSizeSupplier.get() + (beforeRunning ? 1 : 0);
var full = !ignoreTaskLimit && queueSize >= maximumTaskNumber;
if (queueSizeStatus != null) queueSizeStatus.accept(full, queueSize);
}
@Override
public void shutdown() {
this.ignoreTaskLimit = true;
super.shutdown();
}
void testShutdown() {
super.shutdown();
}
@NotNull
@Override
public List<Runnable> shutdownNow() {
this.ignoreTaskLimit = true;
return super.shutdownNow();
}
@Override
public boolean isShutdown() {
return super.isShutdown();
}
@Override
public boolean isTerminated() {
return super.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) throws InterruptedException {
return super.awaitTermination(timeout, unit);
}
@NotNull
@Override
public <T> List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> tasks) {
throw new UnsupportedOperationException("invokeAll(tasks) is not supported");
}
@NotNull
@Override
public <T> List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> tasks,
long timeout,
@NotNull TimeUnit unit) {
throw new UnsupportedOperationException("invokeAll(tasks, timeout, unit) is not supported");
}
@NotNull
@Override
public <T> T invokeAny(@NotNull Collection<? extends Callable<T>> tasks) {
throw new UnsupportedOperationException("invokeAny(tasks) is not supported");
}
@Override
public <T> T invokeAny(@NotNull Collection<? extends Callable<T>> tasks, long timeout, @NotNull TimeUnit unit) {
throw new UnsupportedOperationException("invokeAny(tasks, timeout, unit) is not supported");
}
@Override
public final String toString() {
return String.format("%s[timeout='%s',delegate='%s']", getClass().getSimpleName(),
this.timeout, super.toString());
}
}