Update ParallelUtils.java, BlockingOnFullQueueExecutorServiceDecorator.java, and 2 more files...

This commit is contained in:
Andrea Cavalli 2020-07-11 16:21:54 +02:00
parent 3ca4ea9760
commit 152c965200
4 changed files with 66 additions and 22 deletions

View File

@ -17,7 +17,7 @@ public class ParallelUtils {
int parallelism, int parallelism,
int groupSize, int groupSize,
BiConsumer<K, V> consumer) { BiConsumer<K, V> consumer) {
BoundedExecutorService parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, parallelism * 2, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {}); BoundedExecutorService parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {});
final int CHUNK_SIZE = groupSize; final int CHUNK_SIZE = groupSize;
IntWrapper count = new IntWrapper(CHUNK_SIZE); IntWrapper count = new IntWrapper(CHUNK_SIZE);
VariableWrapper<Object[]> keys = new VariableWrapper<>(new Object[CHUNK_SIZE]); VariableWrapper<Object[]> keys = new VariableWrapper<>(new Object[CHUNK_SIZE]);
@ -57,7 +57,7 @@ public class ParallelUtils {
int parallelism, int parallelism,
int groupSize, int groupSize,
TriConsumer<K1, K2, V> consumer) { TriConsumer<K1, K2, V> consumer) {
BoundedExecutorService parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, parallelism * 2, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {}); BoundedExecutorService parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {});
final int CHUNK_SIZE = groupSize; final int CHUNK_SIZE = groupSize;
IntWrapper count = new IntWrapper(CHUNK_SIZE); IntWrapper count = new IntWrapper(CHUNK_SIZE);
VariableWrapper<Object[]> keys1 = new VariableWrapper<>(new Object[CHUNK_SIZE]); VariableWrapper<Object[]> keys1 = new VariableWrapper<>(new Object[CHUNK_SIZE]);

View File

@ -14,8 +14,11 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecutorService { public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecutorService {
@ -78,16 +81,28 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu
} }
} }
private volatile boolean ignoreTaskLimit;
@Nonnull @Nonnull
private final Semaphore taskLimit; private final Semaphore taskLimit;
@Nonnull @Nonnull
private final Duration timeout; private final Duration timeout;
private final int maximumTaskNumber;
@Nonnull
private final Supplier<Integer> queueSizeSupplier;
private final @Nullable BiConsumer<Boolean, Integer> queueSizeStatus;
@Nonnull
private final Object queueSizeStatusLock;
@Nonnull @Nonnull
private final ExecutorService delegate; private final ExecutorService delegate;
public BlockingOnFullQueueExecutorServiceDecorator(@Nonnull final ExecutorService executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout) { public BlockingOnFullQueueExecutorServiceDecorator(@Nonnull final ExecutorService executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout, @Nonnull Supplier<Integer> queueSizeSupplier, @Nullable BiConsumer<Boolean, Integer> queueSizeStatus) {
this.delegate = Objects.requireNonNull(executor, "'executor' must not be null"); this.delegate = Objects.requireNonNull(executor, "'executor' must not be null");
if (maximumTaskNumber < 1) { if (maximumTaskNumber < 1) {
throw new IllegalArgumentException(String.format("At least one task must be permitted, not '%d'", maximumTaskNumber)); throw new IllegalArgumentException(String.format("At least one task must be permitted, not '%d'", maximumTaskNumber));
@ -96,23 +111,42 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu
if (this.timeout.isNegative()) { if (this.timeout.isNegative()) {
throw new IllegalArgumentException("'maximumTimeout' must not be negative"); throw new IllegalArgumentException("'maximumTimeout' must not be negative");
} }
this.maximumTaskNumber = maximumTaskNumber;
this.queueSizeSupplier = queueSizeSupplier;
this.queueSizeStatus = queueSizeStatus;
this.queueSizeStatusLock = new Object();
this.taskLimit = new Semaphore(maximumTaskNumber); this.taskLimit = new Semaphore(maximumTaskNumber);
} }
private void preExecute(Object command) { private void preExecute(Object command) {
Objects.requireNonNull(command, "'command' must not be null"); Objects.requireNonNull(command, "'command' must not be null");
var queueSize = queueSizeSupplier.get();
synchronized (queueSizeStatusLock) {
if (queueSizeStatus != null) queueSizeStatus.accept(queueSize >= maximumTaskNumber, queueSize);
}
if (!ignoreTaskLimit) {
try { try {
if (this.taskLimit.availablePermits() == 0) {
synchronized (queueSizeStatusLock) {
if (queueSizeStatus != null)
queueSizeStatus.accept(true,
maximumTaskNumber + (taskLimit.hasQueuedThreads() ? taskLimit.getQueueLength() : 0)
);
}
}
// attempt to acquire permit for task execution // attempt to acquire permit for task execution
if (!this.taskLimit.tryAcquire(this.timeout.toMillis(), MILLISECONDS)) { if (!this.taskLimit.tryAcquire(this.timeout.toMillis(), MILLISECONDS)) {
throw new RejectedExecutionException(String.format("Executor '%s' busy", this.delegate)); throw new RejectedExecutionException(String.format("Executor '%s' busy", this.delegate));
} }
} } catch (final InterruptedException e) {
catch (final InterruptedException e) {
// restore interrupt status // restore interrupt status
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
} }
}
@Override @Override
public final void execute(final Runnable command) { public final void execute(final Runnable command) {
@ -124,12 +158,20 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu
@Override @Override
public void shutdown() { public void shutdown() {
this.ignoreTaskLimit = true;
while (this.taskLimit.hasQueuedThreads()) {
this.taskLimit.release(10);
}
this.delegate.shutdown(); this.delegate.shutdown();
} }
@NotNull @NotNull
@Override @Override
public List<Runnable> shutdownNow() { public List<Runnable> shutdownNow() {
this.ignoreTaskLimit = true;
while (this.taskLimit.hasQueuedThreads()) {
this.taskLimit.release(10);
}
return this.delegate.shutdownNow(); return this.delegate.shutdownNow();
} }

View File

@ -15,46 +15,49 @@ public interface BoundedExecutorService extends ExecutorService {
@Deprecated @Deprecated
static BoundedExecutorService create(int maxQueueSize, static BoundedExecutorService create(int maxQueueSize,
int corePoolSize, int corePoolSize,
int maxPoolSize,
long keepAliveTime, long keepAliveTime,
TimeUnit unit, TimeUnit unit,
@Nullable BiConsumer<Boolean, Integer> queueSizeStatus) { @Nullable BiConsumer<Boolean, Integer> queueSizeStatus) {
return create(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueSizeStatus); return create(maxQueueSize, corePoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueSizeStatus);
} }
static BoundedExecutorService create(int maxQueueSize, static BoundedExecutorService create(int maxQueueSize,
int corePoolSize, int corePoolSize,
int maxPoolSize,
long keepAliveTime, long keepAliveTime,
TimeUnit unit, TimeUnit unit,
ThreadFactory threadFactory, ThreadFactory threadFactory,
@Nullable BiConsumer<Boolean, Integer> queueSizeStatus) { @Nullable BiConsumer<Boolean, Integer> queueSizeStatus) {
var threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, var threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
maxPoolSize, corePoolSize,
keepAliveTime, keepAliveTime,
unit, unit,
new LinkedBlockingQueue<>(), new LinkedBlockingQueue<>(),
threadFactory threadFactory
); );
return new BlockingOnFullQueueExecutorServiceDecorator(threadPoolExecutor, maxPoolSize, Duration.ofDays(1000000)); return create(maxQueueSize, corePoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(1000000), queueSizeStatus);
} }
static BoundedExecutorService create(int maxQueueSize, static BoundedExecutorService create(int maxQueueSize,
int corePoolSize, int corePoolSize,
int maxPoolSize,
long keepAliveTime, long keepAliveTime,
TimeUnit unit, TimeUnit unit,
ThreadFactory threadFactory, ThreadFactory threadFactory,
Duration queueItemTtl, Duration queueItemTtl,
@Nullable BiConsumer<Boolean, Integer> queueSizeStatus) { @Nullable BiConsumer<Boolean, Integer> queueSizeStatus) {
var queue = new LinkedBlockingQueue<Runnable>();
var threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, var threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
maxPoolSize, corePoolSize,
keepAliveTime, keepAliveTime,
unit, unit,
new LinkedBlockingQueue<>(), queue,
threadFactory threadFactory
); );
return new BlockingOnFullQueueExecutorServiceDecorator(threadPoolExecutor, maxPoolSize, queueItemTtl); return new BlockingOnFullQueueExecutorServiceDecorator(threadPoolExecutor,
maxQueueSize,
queueItemTtl,
queue::size,
queueSizeStatus
);
} }
@Deprecated @Deprecated

View File

@ -17,7 +17,6 @@ public class BoundedQueueTest {
AtomicInteger queueSize = new AtomicInteger(); AtomicInteger queueSize = new AtomicInteger();
AtomicReference<AssertionFailedError> failedError = new AtomicReference<>(); AtomicReference<AssertionFailedError> failedError = new AtomicReference<>();
var executor = BoundedExecutorService.create(maxQueueSize, var executor = BoundedExecutorService.create(maxQueueSize,
1,
1, 1,
0L, 0L,
TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS,