Update BlockingOnFullQueueExecutorServiceDecorator.java, BoundedExecutorService.java, and BoundedExecutorServiceImpl.java

This commit is contained in:
Andrea Cavalli 2020-07-11 15:48:39 +02:00
parent 988e104173
commit 3ca4ea9760
3 changed files with 241 additions and 122 deletions

View File

@ -0,0 +1,207 @@
package org.warp.commonutils.concurrency.executor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.jetbrains.annotations.NotNull;
public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecutorService {
private static final class PermitReleasingRunnableDecorator implements Runnable {
@Nonnull
private final Runnable delegate;
@Nonnull
private final Semaphore semaphore;
private PermitReleasingRunnableDecorator(@Nonnull final Runnable task, @Nonnull final Semaphore semaphoreToRelease) {
this.delegate = task;
this.semaphore = semaphoreToRelease;
}
@Override
public void run() {
try {
this.delegate.run();
}
finally {
// however execution goes, release permit for next task
this.semaphore.release();
}
}
@Override
public final String toString() {
return String.format("%s[delegate='%s']", getClass().getSimpleName(), this.delegate);
}
}
private static final class PermitReleasingCallableDecorator<T> implements Callable<T> {
@Nonnull
private final Callable<T> delegate;
@Nonnull
private final Semaphore semaphore;
private PermitReleasingCallableDecorator(@Nonnull final Callable<T> task, @Nonnull final Semaphore semaphoreToRelease) {
this.delegate = task;
this.semaphore = semaphoreToRelease;
}
@Override
public T call() throws Exception {
try {
return this.delegate.call();
} finally {
// however execution goes, release permit for next task
this.semaphore.release();
}
}
@Override
public final String toString() {
return String.format("%s[delegate='%s']", getClass().getSimpleName(), this.delegate);
}
}
@Nonnull
private final Semaphore taskLimit;
@Nonnull
private final Duration timeout;
@Nonnull
private final ExecutorService delegate;
public BlockingOnFullQueueExecutorServiceDecorator(@Nonnull final ExecutorService executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout) {
this.delegate = Objects.requireNonNull(executor, "'executor' must not be null");
if (maximumTaskNumber < 1) {
throw new IllegalArgumentException(String.format("At least one task must be permitted, not '%d'", maximumTaskNumber));
}
this.timeout = Objects.requireNonNull(maximumTimeout, "'maximumTimeout' must not be null");
if (this.timeout.isNegative()) {
throw new IllegalArgumentException("'maximumTimeout' must not be negative");
}
this.taskLimit = new Semaphore(maximumTaskNumber);
}
private void preExecute(Object command) {
Objects.requireNonNull(command, "'command' must not be null");
try {
// attempt to acquire permit for task execution
if (!this.taskLimit.tryAcquire(this.timeout.toMillis(), MILLISECONDS)) {
throw new RejectedExecutionException(String.format("Executor '%s' busy", this.delegate));
}
}
catch (final InterruptedException e) {
// restore interrupt status
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}
@Override
public final void execute(final Runnable command) {
preExecute(command);
this.delegate.execute(new PermitReleasingRunnableDecorator(command, this.taskLimit));
}
@Override
public void shutdown() {
this.delegate.shutdown();
}
@NotNull
@Override
public List<Runnable> shutdownNow() {
return this.delegate.shutdownNow();
}
@Override
public boolean isShutdown() {
return this.delegate.isShutdown();
}
@Override
public boolean isTerminated() {
return this.delegate.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) throws InterruptedException {
return this.delegate.awaitTermination(timeout, unit);
}
@NotNull
@Override
public <T> Future<T> submit(@NotNull Callable<T> task) {
preExecute(task);
return this.delegate.submit(new PermitReleasingCallableDecorator<>(task, this.taskLimit));
}
@NotNull
@Override
public <T> Future<T> submit(@NotNull Runnable task, T result) {
preExecute(task);
return this.delegate.submit(new PermitReleasingRunnableDecorator(task, this.taskLimit), result);
}
@NotNull
@Override
public Future<?> submit(@NotNull Runnable task) {
preExecute(task);
return this.delegate.submit(new PermitReleasingRunnableDecorator(task, this.taskLimit));
}
@NotNull
@Override
public <T> List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> tasks) throws InterruptedException {
throw new UnsupportedOperationException("invokeAll(tasks) is not supported");
}
@NotNull
@Override
public <T> List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> tasks,
long timeout,
@NotNull TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException("invokeAll(tasks, timeout, unit) is not supported");
}
@NotNull
@Override
public <T> T invokeAny(@NotNull Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
throw new UnsupportedOperationException("invokeAny(tasks) is not supported");
}
@Override
public <T> T invokeAny(@NotNull Collection<? extends Callable<T>> tasks, long timeout, @NotNull TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
throw new UnsupportedOperationException("invokeAny(tasks, timeout, unit) is not supported");
}
@Override
public final String toString() {
return String.format("%s[availablePermits='%s',timeout='%s',delegate='%s']", getClass().getSimpleName(), this.taskLimit.availablePermits(),
this.timeout, this.delegate);
}
}

View File

@ -1,10 +1,11 @@
package org.warp.commonutils.concurrency.executor; package org.warp.commonutils.concurrency.executor;
import java.util.concurrent.Callable; import java.time.Duration;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
@ -18,9 +19,9 @@ public interface BoundedExecutorService extends ExecutorService {
long keepAliveTime, long keepAliveTime,
TimeUnit unit, TimeUnit unit,
@Nullable BiConsumer<Boolean, Integer> queueSizeStatus) { @Nullable BiConsumer<Boolean, Integer> queueSizeStatus) {
return new BoundedExecutorServiceImpl(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, return create(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueSizeStatus);
Executors.defaultThreadFactory(), queueSizeStatus);
} }
static BoundedExecutorService create(int maxQueueSize, static BoundedExecutorService create(int maxQueueSize,
int corePoolSize, int corePoolSize,
int maxPoolSize, int maxPoolSize,
@ -28,10 +29,36 @@ public interface BoundedExecutorService extends ExecutorService {
TimeUnit unit, TimeUnit unit,
ThreadFactory threadFactory, ThreadFactory threadFactory,
@Nullable BiConsumer<Boolean, Integer> queueSizeStatus) { @Nullable BiConsumer<Boolean, Integer> queueSizeStatus) {
return new BoundedExecutorServiceImpl(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, threadFactory, queueSizeStatus); var threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
maxPoolSize,
keepAliveTime,
unit,
new LinkedBlockingQueue<>(),
threadFactory
);
return new BlockingOnFullQueueExecutorServiceDecorator(threadPoolExecutor, maxPoolSize, Duration.ofDays(1000000));
} }
<T> Future<T> submitButBlockIfFull(Callable<T> task) throws InterruptedException; static BoundedExecutorService create(int maxQueueSize,
int corePoolSize,
int maxPoolSize,
long keepAliveTime,
TimeUnit unit,
ThreadFactory threadFactory,
Duration queueItemTtl,
@Nullable BiConsumer<Boolean, Integer> queueSizeStatus) {
var threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
maxPoolSize,
keepAliveTime,
unit,
new LinkedBlockingQueue<>(),
threadFactory
);
return new BlockingOnFullQueueExecutorServiceDecorator(threadPoolExecutor, maxPoolSize, queueItemTtl);
}
void executeButBlockIfFull(Runnable task) throws InterruptedException; @Deprecated
default void executeButBlockIfFull(Runnable task) throws InterruptedException {
this.execute(task);
}
} }

View File

@ -1,115 +0,0 @@
package org.warp.commonutils.concurrency.executor;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.jetbrains.annotations.Nullable;
class BoundedExecutorServiceImpl extends ThreadPoolExecutor implements BoundedExecutorService {
private final Semaphore semaphore;
private final @Nullable BiConsumer<Boolean, Integer> queueSizeStatus;
private final int maxQueueSize;
private final Object queueSizeStatusLock = new Object();
/**
*
* @param maxQueueSize
* @param corePoolSize
* @param maxPoolSize
* @param keepAliveTime
* @param unit
* @param queueSizeStatus Status. The boolean indicates if the queue is full, the integer indicates the current queue size
*/
public BoundedExecutorServiceImpl(int maxQueueSize,
int corePoolSize,
int maxPoolSize,
long keepAliveTime,
TimeUnit unit,
ThreadFactory threadFactory,
@Nullable BiConsumer<Boolean, Integer> queueSizeStatus) {
super(corePoolSize, maxPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<>(), threadFactory);
if (maxQueueSize < 0) {
throw new IllegalArgumentException();
}
this.maxQueueSize = maxQueueSize;
this.queueSizeStatus = queueSizeStatus;
semaphore = new Semaphore(maxQueueSize);
}
/**
* Submits task to execution pool, but blocks while number of running threads
* has reached the bound limit
*/
@Override
public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException {
return this.submit(task);
}
@Override
public Future<?> submit(Runnable task) {
preChecks();
return super.submit(task);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
preChecks();
return super.submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
preChecks();
return super.submit(task, result);
}
/**
* Submits task to execution pool, but blocks while number of running threads
* has reached the bound limit
*/
@Override
public void executeButBlockIfFull(final Runnable task) throws InterruptedException {
this.execute(task);
}
@Override
public void execute(Runnable command) {
preChecks();
super.execute(command);
}
private void preChecks() {
try {
blockIfFull();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
var queueSize = getQueue().size();
synchronized (queueSizeStatusLock) {
if (queueSizeStatus != null) queueSizeStatus.accept(queueSize >= maxQueueSize, queueSize);
}
semaphore.release();
super.beforeExecute(t, r);
}
private void blockIfFull() throws InterruptedException {
if (semaphore.availablePermits() == 0) {
synchronized (queueSizeStatusLock) {
if (queueSizeStatus != null) queueSizeStatus.accept(true, maxQueueSize + (semaphore.hasQueuedThreads() ? semaphore.getQueueLength() : 0));
}
}
semaphore.acquire();
}
}