Fixed bounded executor service
This commit is contained in:
parent
453a48e3a8
commit
72bdf4a309
|
@ -1,6 +1,7 @@
|
||||||
package org.warp.commonutils.batch;
|
package org.warp.commonutils.batch;
|
||||||
|
|
||||||
import java.util.concurrent.CompletionException;
|
import java.util.concurrent.CompletionException;
|
||||||
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.BiConsumer;
|
import java.util.function.BiConsumer;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
@ -33,13 +34,13 @@ public class ParallelUtils {
|
||||||
keys.var = new Object[CHUNK_SIZE];
|
keys.var = new Object[CHUNK_SIZE];
|
||||||
values.var = new Object[CHUNK_SIZE];
|
values.var = new Object[CHUNK_SIZE];
|
||||||
try {
|
try {
|
||||||
parallelExecutor.executeButBlockIfFull(() -> {
|
parallelExecutor.execute(() -> {
|
||||||
for (int i = 0; i < CHUNK_SIZE; i++) {
|
for (int i = 0; i < CHUNK_SIZE; i++) {
|
||||||
//noinspection unchecked
|
//noinspection unchecked
|
||||||
consumer.accept((K) keysCopy[i], (V) valuesCopy[i]);
|
consumer.accept((K) keysCopy[i], (V) valuesCopy[i]);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} catch (InterruptedException e) {
|
} catch (RejectedExecutionException e) {
|
||||||
throw new CompletionException(e);
|
throw new CompletionException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -77,13 +78,13 @@ public class ParallelUtils {
|
||||||
keys2.var = new Object[CHUNK_SIZE];
|
keys2.var = new Object[CHUNK_SIZE];
|
||||||
values.var = new Object[CHUNK_SIZE];
|
values.var = new Object[CHUNK_SIZE];
|
||||||
try {
|
try {
|
||||||
parallelExecutor.executeButBlockIfFull(() -> {
|
parallelExecutor.execute(() -> {
|
||||||
for (int i = 0; i < CHUNK_SIZE; i++) {
|
for (int i = 0; i < CHUNK_SIZE; i++) {
|
||||||
//noinspection unchecked
|
//noinspection unchecked
|
||||||
consumer.accept((K1) keys1Copy[i], (K2) keys2Copy[i], (V) valuesCopy[i]);
|
consumer.accept((K1) keys1Copy[i], (K2) keys2Copy[i], (V) valuesCopy[i]);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} catch (InterruptedException e) {
|
} catch (RejectedExecutionException e) {
|
||||||
throw new CompletionException(e);
|
throw new CompletionException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -122,8 +122,10 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu
|
||||||
|
|
||||||
public BlockingOnFullQueueExecutorServiceDecorator(@Nonnull final ExecutorService executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout, @Nonnull Supplier<Integer> queueSizeSupplier, @Nullable BiConsumer<Boolean, Integer> queueSizeStatus) {
|
public BlockingOnFullQueueExecutorServiceDecorator(@Nonnull final ExecutorService executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout, @Nonnull Supplier<Integer> queueSizeSupplier, @Nullable BiConsumer<Boolean, Integer> queueSizeStatus) {
|
||||||
this.delegate = Objects.requireNonNull(executor, "'executor' must not be null");
|
this.delegate = Objects.requireNonNull(executor, "'executor' must not be null");
|
||||||
if (maximumTaskNumber < 1) {
|
if (maximumTaskNumber < 0) {
|
||||||
throw new IllegalArgumentException(String.format("At least one task must be permitted, not '%d'", maximumTaskNumber));
|
throw new IllegalArgumentException(String.format("At least zero tasks must be permitted, not '%d'", maximumTaskNumber));
|
||||||
|
} else if (maximumTaskNumber == 0) {
|
||||||
|
ignoreTaskLimit = true;
|
||||||
}
|
}
|
||||||
this.timeout = Objects.requireNonNull(maximumTimeout, "'maximumTimeout' must not be null");
|
this.timeout = Objects.requireNonNull(maximumTimeout, "'maximumTimeout' must not be null");
|
||||||
if (this.timeout.isNegative()) {
|
if (this.timeout.isNegative()) {
|
||||||
|
@ -155,7 +157,7 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu
|
||||||
} catch (final InterruptedException e) {
|
} catch (final InterruptedException e) {
|
||||||
// restore interrupt status
|
// restore interrupt status
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
throw new IllegalStateException(e);
|
throw new RejectedExecutionException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -167,7 +169,7 @@ public class BlockingOnFullQueueExecutorServiceDecorator implements BoundedExecu
|
||||||
this.delegate.execute(new PermitReleasingRunnableDecorator(command, () -> {
|
this.delegate.execute(new PermitReleasingRunnableDecorator(command, () -> {
|
||||||
var queueSize = queueSizeSupplier.get();
|
var queueSize = queueSizeSupplier.get();
|
||||||
synchronized (queueSizeStatusLock) {
|
synchronized (queueSizeStatusLock) {
|
||||||
if (queueSizeStatus != null) queueSizeStatus.accept(queueSize >= maximumTaskNumber, queueSize);
|
if (queueSizeStatus != null) queueSizeStatus.accept(!ignoreTaskLimit && queueSize >= maximumTaskNumber, queueSize);
|
||||||
}
|
}
|
||||||
}, this.taskLimit));
|
}, this.taskLimit));
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,31 @@ import org.jetbrains.annotations.Nullable;
|
||||||
|
|
||||||
public interface BoundedExecutorService extends ExecutorService {
|
public interface BoundedExecutorService extends ExecutorService {
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
|
static ExecutorService createUnbounded(
|
||||||
|
int corePoolSize,
|
||||||
|
long keepAliveTime,
|
||||||
|
TimeUnit unit,
|
||||||
|
@Nullable BiConsumer<Boolean, Integer> queueSizeStatus) {
|
||||||
|
return create(0, corePoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueSizeStatus);
|
||||||
|
}
|
||||||
|
|
||||||
|
static ExecutorService createUnbounded(
|
||||||
|
int corePoolSize,
|
||||||
|
long keepAliveTime,
|
||||||
|
TimeUnit unit,
|
||||||
|
ThreadFactory threadFactory,
|
||||||
|
@Nullable BiConsumer<Boolean, Integer> queueSizeStatus) {
|
||||||
|
var threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
|
||||||
|
corePoolSize,
|
||||||
|
keepAliveTime,
|
||||||
|
unit,
|
||||||
|
new LinkedBlockingQueue<>(),
|
||||||
|
threadFactory
|
||||||
|
);
|
||||||
|
return create(0, corePoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(1000000), queueSizeStatus);
|
||||||
|
}
|
||||||
|
|
||||||
@Deprecated
|
@Deprecated
|
||||||
static BoundedExecutorService create(int maxQueueSize,
|
static BoundedExecutorService create(int maxQueueSize,
|
||||||
int corePoolSize,
|
int corePoolSize,
|
||||||
|
@ -59,9 +84,4 @@ public interface BoundedExecutorService extends ExecutorService {
|
||||||
queueSizeStatus
|
queueSizeStatus
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
default void executeButBlockIfFull(Runnable task) throws InterruptedException {
|
|
||||||
this.execute(task);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,7 +39,7 @@ public class BoundedQueueTest {
|
||||||
|
|
||||||
for (int i = 0; i < 10000; i++) {
|
for (int i = 0; i < 10000; i++) {
|
||||||
queueSize.incrementAndGet();
|
queueSize.incrementAndGet();
|
||||||
executor.executeButBlockIfFull(queueSize::decrementAndGet);
|
executor.execute(queueSize::decrementAndGet);
|
||||||
}
|
}
|
||||||
|
|
||||||
executor.shutdown();
|
executor.shutdown();
|
||||||
|
|
Loading…
Reference in New Issue
Block a user