Updated to 1.0.3

This commit is contained in:
Andrea Cavalli 2020-08-16 22:52:46 +02:00
parent 065635e24d
commit 02ad72c643
4 changed files with 30 additions and 27 deletions

View File

@ -7,7 +7,7 @@
<artifactId>common-utils</artifactId> <artifactId>common-utils</artifactId>
<groupId>org.warp</groupId> <groupId>org.warp</groupId>
<version>1.0.2</version> <version>1.0.3</version>
<properties> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

View File

@ -18,7 +18,7 @@ public class ParallelUtils {
int parallelism, int parallelism,
int groupSize, int groupSize,
BiConsumer<K, V> consumer) { BiConsumer<K, V> consumer) {
BoundedExecutorService parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {}); BoundedExecutorService parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, parallelism, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {});
final int CHUNK_SIZE = groupSize; final int CHUNK_SIZE = groupSize;
IntWrapper count = new IntWrapper(CHUNK_SIZE); IntWrapper count = new IntWrapper(CHUNK_SIZE);
VariableWrapper<Object[]> keys = new VariableWrapper<>(new Object[CHUNK_SIZE]); VariableWrapper<Object[]> keys = new VariableWrapper<>(new Object[CHUNK_SIZE]);
@ -58,7 +58,7 @@ public class ParallelUtils {
int parallelism, int parallelism,
int groupSize, int groupSize,
TriConsumer<K1, K2, V> consumer) { TriConsumer<K1, K2, V> consumer) {
BoundedExecutorService parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {}); BoundedExecutorService parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, parallelism, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {});
final int CHUNK_SIZE = groupSize; final int CHUNK_SIZE = groupSize;
IntWrapper count = new IntWrapper(CHUNK_SIZE); IntWrapper count = new IntWrapper(CHUNK_SIZE);
VariableWrapper<Object[]> keys1 = new VariableWrapper<>(new Object[CHUNK_SIZE]); VariableWrapper<Object[]> keys1 = new VariableWrapper<>(new Object[CHUNK_SIZE]);

View File

@ -14,64 +14,60 @@ public interface BoundedExecutorService extends ExecutorService {
@Deprecated @Deprecated
static ExecutorService createUnbounded( static ExecutorService createUnbounded(
boolean resizable,
int corePoolSize, int corePoolSize,
int maxPoolSize,
long keepAliveTime, long keepAliveTime,
TimeUnit unit, TimeUnit unit,
@Nullable BiConsumer<Boolean, Integer> queueSizeStatus) { @Nullable BiConsumer<Boolean, Integer> queueSizeStatus) {
return create(0, corePoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueSizeStatus); return create(0, corePoolSize, maxPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueSizeStatus);
} }
static ExecutorService createUnbounded( static ExecutorService createUnbounded(
boolean resizable,
int corePoolSize, int corePoolSize,
int maxPoolSize,
long keepAliveTime, long keepAliveTime,
TimeUnit unit, TimeUnit unit,
ThreadFactory threadFactory, ThreadFactory threadFactory,
@Nullable BiConsumer<Boolean, Integer> queueSizeStatus) { @Nullable BiConsumer<Boolean, Integer> queueSizeStatus) {
var threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, return create(0, corePoolSize, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus);
corePoolSize,
keepAliveTime,
unit,
new LinkedBlockingQueue<>(),
threadFactory
);
return create(0, corePoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(1000000), queueSizeStatus);
} }
@Deprecated @Deprecated
static BoundedExecutorService create(int maxQueueSize, static BoundedExecutorService create(
int maxQueueSize,
int corePoolSize, int corePoolSize,
int maxPoolSize,
long keepAliveTime, long keepAliveTime,
TimeUnit unit, TimeUnit unit,
@Nullable BiConsumer<Boolean, Integer> queueSizeStatus) { @Nullable BiConsumer<Boolean, Integer> queueSizeStatus) {
return create(maxQueueSize, corePoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueSizeStatus); return create(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueSizeStatus);
} }
static BoundedExecutorService create(int maxQueueSize, static BoundedExecutorService create(
int maxQueueSize,
int corePoolSize, int corePoolSize,
int maxPoolSize,
long keepAliveTime, long keepAliveTime,
TimeUnit unit, TimeUnit unit,
ThreadFactory threadFactory, ThreadFactory threadFactory,
@Nullable BiConsumer<Boolean, Integer> queueSizeStatus) { @Nullable BiConsumer<Boolean, Integer> queueSizeStatus) {
var threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, return create(maxQueueSize, corePoolSize, maxPoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(100000), queueSizeStatus);
corePoolSize,
keepAliveTime,
unit,
new LinkedBlockingQueue<>(),
threadFactory
);
return create(maxQueueSize, corePoolSize, keepAliveTime, unit, threadFactory, Duration.ofDays(1000000), queueSizeStatus);
} }
static BoundedExecutorService create(int maxQueueSize, static BoundedExecutorService create(
int maxQueueSize,
int corePoolSize, int corePoolSize,
int maxPoolSize,
long keepAliveTime, long keepAliveTime,
TimeUnit unit, TimeUnit unit,
ThreadFactory threadFactory, ThreadFactory threadFactory,
Duration queueItemTtl, Duration queueItemTtl,
@Nullable BiConsumer<Boolean, Integer> queueSizeStatus) { @Nullable BiConsumer<Boolean, Integer> queueSizeStatus) {
var queue = new LinkedBlockingQueue<Runnable>(); var queue = new LinkedBlockingQueue<Runnable>();
var threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
corePoolSize, maxPoolSize,
keepAliveTime, keepAliveTime,
unit, unit,
queue, queue,

View File

@ -13,11 +13,18 @@ public class BoundedQueueTest {
@Test @Test
public void testBoundedQueue() throws InterruptedException { public void testBoundedQueue() throws InterruptedException {
testBoundedQueue(1, 1);
testBoundedQueue(1, 10);
testBoundedQueue(4, 10);
}
public void testBoundedQueue(int corePoolSize, int maxPoolSize) throws InterruptedException {
int maxQueueSize = 2; int maxQueueSize = 2;
AtomicInteger queueSize = new AtomicInteger(); AtomicInteger queueSize = new AtomicInteger();
AtomicReference<AssertionFailedError> failedError = new AtomicReference<>(); AtomicReference<AssertionFailedError> failedError = new AtomicReference<>();
var executor = BoundedExecutorService.create(maxQueueSize, var executor = BoundedExecutorService.create(maxQueueSize,
1, corePoolSize,
maxPoolSize,
0L, 0L,
TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS,
new ShortNamedThreadFactory("test"), new ShortNamedThreadFactory("test"),