Update to 1.0.5
This commit is contained in:
parent
ec66736bd5
commit
3b9dd36025
2
pom.xml
2
pom.xml
@ -7,7 +7,7 @@
|
|||||||
|
|
||||||
<artifactId>common-utils</artifactId>
|
<artifactId>common-utils</artifactId>
|
||||||
<groupId>org.warp</groupId>
|
<groupId>org.warp</groupId>
|
||||||
<version>1.0.4</version>
|
<version>1.0.5</version>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||||
|
@ -3,6 +3,7 @@ package org.warp.commonutils.batch;
|
|||||||
import java.util.concurrent.CompletionException;
|
import java.util.concurrent.CompletionException;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.function.BiConsumer;
|
import java.util.function.BiConsumer;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import org.warp.commonutils.concurrency.executor.BoundedExecutorService;
|
import org.warp.commonutils.concurrency.executor.BoundedExecutorService;
|
||||||
@ -13,10 +14,10 @@ import org.warp.commonutils.type.VariableWrapper;
|
|||||||
|
|
||||||
public class ParallelUtils {
|
public class ParallelUtils {
|
||||||
|
|
||||||
public static <K, V> void parallelize(Consumer<BiConsumer<K, V>> iterator,
|
public static <V> void parallelize(Consumer<Consumer<V>> iterator,
|
||||||
int maxQueueSize,
|
int maxQueueSize,
|
||||||
int parallelism,
|
int parallelism,
|
||||||
int groupSize, BiConsumer<K, V> consumer) {
|
int groupSize, Consumer<V> consumer) throws CompletionException {
|
||||||
var parallelExecutor = BoundedExecutorService.create(maxQueueSize,
|
var parallelExecutor = BoundedExecutorService.create(maxQueueSize,
|
||||||
parallelism,
|
parallelism,
|
||||||
parallelism,
|
parallelism,
|
||||||
@ -27,23 +28,29 @@ public class ParallelUtils {
|
|||||||
);
|
);
|
||||||
final int CHUNK_SIZE = groupSize;
|
final int CHUNK_SIZE = groupSize;
|
||||||
IntWrapper count = new IntWrapper(CHUNK_SIZE);
|
IntWrapper count = new IntWrapper(CHUNK_SIZE);
|
||||||
VariableWrapper<Object[]> keys = new VariableWrapper<>(new Object[CHUNK_SIZE]);
|
|
||||||
VariableWrapper<Object[]> values = new VariableWrapper<>(new Object[CHUNK_SIZE]);
|
VariableWrapper<Object[]> values = new VariableWrapper<>(new Object[CHUNK_SIZE]);
|
||||||
iterator.accept((key, value) -> {
|
AtomicReference<CompletionException> firstExceptionReference = new AtomicReference<>(null);
|
||||||
keys.var[CHUNK_SIZE - count.var] = key;
|
iterator.accept((value) -> {
|
||||||
|
var firstException = firstExceptionReference.get();
|
||||||
|
if (firstException != null) {
|
||||||
|
throw firstException;
|
||||||
|
}
|
||||||
|
|
||||||
values.var[CHUNK_SIZE - count.var] = value;
|
values.var[CHUNK_SIZE - count.var] = value;
|
||||||
count.var--;
|
count.var--;
|
||||||
if (count.var == 0) {
|
if (count.var == 0) {
|
||||||
count.var = CHUNK_SIZE;
|
count.var = CHUNK_SIZE;
|
||||||
Object[] keysCopy = keys.var;
|
|
||||||
Object[] valuesCopy = values.var;
|
Object[] valuesCopy = values.var;
|
||||||
keys.var = new Object[CHUNK_SIZE];
|
|
||||||
values.var = new Object[CHUNK_SIZE];
|
values.var = new Object[CHUNK_SIZE];
|
||||||
try {
|
try {
|
||||||
parallelExecutor.execute(() -> {
|
parallelExecutor.execute(() -> {
|
||||||
for (int i = 0; i < CHUNK_SIZE; i++) {
|
for (int i = 0; i < CHUNK_SIZE; i++) {
|
||||||
//noinspection unchecked
|
try {
|
||||||
consumer.accept((K) keysCopy[i], (V) valuesCopy[i]);
|
//noinspection unchecked
|
||||||
|
consumer.accept((V) valuesCopy[i]);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
firstExceptionReference.compareAndSet(null, new CompletionException(ex));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} catch (RejectedExecutionException e) {
|
} catch (RejectedExecutionException e) {
|
||||||
@ -57,13 +64,79 @@ public class ParallelUtils {
|
|||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw new RuntimeException("Parallel forEach interrupted", e);
|
throw new RuntimeException("Parallel forEach interrupted", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var firstException = firstExceptionReference.get();
|
||||||
|
if (firstException != null) {
|
||||||
|
throw firstException;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <K, V> void parallelize(Consumer<BiConsumer<K, V>> iterator,
|
||||||
|
int maxQueueSize,
|
||||||
|
int parallelism,
|
||||||
|
int groupSize, BiConsumer<K, V> consumer) throws CompletionException {
|
||||||
|
var parallelExecutor = BoundedExecutorService.create(maxQueueSize,
|
||||||
|
parallelism,
|
||||||
|
parallelism,
|
||||||
|
0,
|
||||||
|
TimeUnit.MILLISECONDS,
|
||||||
|
new ShortNamedThreadFactory("ForEachParallel"),
|
||||||
|
(a, b) -> {}
|
||||||
|
);
|
||||||
|
final int CHUNK_SIZE = groupSize;
|
||||||
|
IntWrapper count = new IntWrapper(CHUNK_SIZE);
|
||||||
|
VariableWrapper<Object[]> keys = new VariableWrapper<>(new Object[CHUNK_SIZE]);
|
||||||
|
VariableWrapper<Object[]> values = new VariableWrapper<>(new Object[CHUNK_SIZE]);
|
||||||
|
AtomicReference<CompletionException> firstExceptionReference = new AtomicReference<>(null);
|
||||||
|
iterator.accept((key, value) -> {
|
||||||
|
var firstException = firstExceptionReference.get();
|
||||||
|
if (firstException != null) {
|
||||||
|
throw firstException;
|
||||||
|
}
|
||||||
|
|
||||||
|
keys.var[CHUNK_SIZE - count.var] = key;
|
||||||
|
values.var[CHUNK_SIZE - count.var] = value;
|
||||||
|
count.var--;
|
||||||
|
if (count.var == 0) {
|
||||||
|
count.var = CHUNK_SIZE;
|
||||||
|
Object[] keysCopy = keys.var;
|
||||||
|
Object[] valuesCopy = values.var;
|
||||||
|
keys.var = new Object[CHUNK_SIZE];
|
||||||
|
values.var = new Object[CHUNK_SIZE];
|
||||||
|
try {
|
||||||
|
parallelExecutor.execute(() -> {
|
||||||
|
for (int i = 0; i < CHUNK_SIZE; i++) {
|
||||||
|
try {
|
||||||
|
//noinspection unchecked
|
||||||
|
consumer.accept((K) keysCopy[i], (V) valuesCopy[i]);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
firstExceptionReference.compareAndSet(null, new CompletionException(ex));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (RejectedExecutionException e) {
|
||||||
|
throw new CompletionException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
parallelExecutor.shutdown();
|
||||||
|
try {
|
||||||
|
parallelExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException("Parallel forEach interrupted", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
var firstException = firstExceptionReference.get();
|
||||||
|
if (firstException != null) {
|
||||||
|
throw firstException;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <K1, K2, V> void parallelize(Consumer<TriConsumer<K1, K2, V>> iterator,
|
public static <K1, K2, V> void parallelize(Consumer<TriConsumer<K1, K2, V>> iterator,
|
||||||
int maxQueueSize,
|
int maxQueueSize,
|
||||||
int parallelism,
|
int parallelism,
|
||||||
int groupSize,
|
int groupSize,
|
||||||
TriConsumer<K1, K2, V> consumer) {
|
TriConsumer<K1, K2, V> consumer) throws CompletionException {
|
||||||
var parallelExecutor = BoundedExecutorService.create(maxQueueSize,
|
var parallelExecutor = BoundedExecutorService.create(maxQueueSize,
|
||||||
parallelism,
|
parallelism,
|
||||||
parallelism,
|
parallelism,
|
||||||
@ -77,7 +150,13 @@ public class ParallelUtils {
|
|||||||
VariableWrapper<Object[]> keys1 = new VariableWrapper<>(new Object[CHUNK_SIZE]);
|
VariableWrapper<Object[]> keys1 = new VariableWrapper<>(new Object[CHUNK_SIZE]);
|
||||||
VariableWrapper<Object[]> keys2 = new VariableWrapper<>(new Object[CHUNK_SIZE]);
|
VariableWrapper<Object[]> keys2 = new VariableWrapper<>(new Object[CHUNK_SIZE]);
|
||||||
VariableWrapper<Object[]> values = new VariableWrapper<>(new Object[CHUNK_SIZE]);
|
VariableWrapper<Object[]> values = new VariableWrapper<>(new Object[CHUNK_SIZE]);
|
||||||
|
AtomicReference<CompletionException> firstExceptionReference = new AtomicReference<>(null);
|
||||||
iterator.accept((key1, key2, value) -> {
|
iterator.accept((key1, key2, value) -> {
|
||||||
|
var firstException = firstExceptionReference.get();
|
||||||
|
if (firstException != null) {
|
||||||
|
throw firstException;
|
||||||
|
}
|
||||||
|
|
||||||
keys1.var[CHUNK_SIZE - count.var] = key1;
|
keys1.var[CHUNK_SIZE - count.var] = key1;
|
||||||
keys2.var[CHUNK_SIZE - count.var] = key2;
|
keys2.var[CHUNK_SIZE - count.var] = key2;
|
||||||
values.var[CHUNK_SIZE - count.var] = value;
|
values.var[CHUNK_SIZE - count.var] = value;
|
||||||
@ -93,8 +172,12 @@ public class ParallelUtils {
|
|||||||
try {
|
try {
|
||||||
parallelExecutor.execute(() -> {
|
parallelExecutor.execute(() -> {
|
||||||
for (int i = 0; i < CHUNK_SIZE; i++) {
|
for (int i = 0; i < CHUNK_SIZE; i++) {
|
||||||
//noinspection unchecked
|
try {
|
||||||
consumer.accept((K1) keys1Copy[i], (K2) keys2Copy[i], (V) valuesCopy[i]);
|
//noinspection unchecked
|
||||||
|
consumer.accept((K1) keys1Copy[i], (K2) keys2Copy[i], (V) valuesCopy[i]);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
firstExceptionReference.compareAndSet(null, new CompletionException(ex));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} catch (RejectedExecutionException e) {
|
} catch (RejectedExecutionException e) {
|
||||||
@ -108,5 +191,10 @@ public class ParallelUtils {
|
|||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw new RuntimeException("Parallel forEach interrupted", e);
|
throw new RuntimeException("Parallel forEach interrupted", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var firstException = firstExceptionReference.get();
|
||||||
|
if (firstException != null) {
|
||||||
|
throw firstException;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,26 @@
|
|||||||
|
package org.warp.commonutils.concurrency.executor;
|
||||||
|
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
|
||||||
|
public class AsyncStackTraceExecutorServiceDecorator extends SimplerExecutorServiceDecorator {
|
||||||
|
|
||||||
|
public AsyncStackTraceExecutorServiceDecorator(ExecutorService executorService) {
|
||||||
|
super(executorService, (executor) -> {
|
||||||
|
// Do nothing if it has already the asyncstacktrace executor service decorator
|
||||||
|
if (executorService instanceof ExecutorServiceDecorator) {
|
||||||
|
var decorators = ((ExecutorServiceDecorator) executorService).getExecutorServiceDecorators();
|
||||||
|
if (decorators.contains(AsyncStackTraceExecutorServiceDecorator.class)) {
|
||||||
|
return new ExecutorDecorator(executorService) {
|
||||||
|
@Override
|
||||||
|
public void execute(@NotNull Runnable runnable) {
|
||||||
|
super.execute(runnable);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return new AsyncStackTraceExecutorDecorator(executor);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
@ -12,7 +12,7 @@ import java.util.concurrent.TimeoutException;
|
|||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
|
||||||
public class SimplerExecutorServiceDecorator extends ExecutorServiceDecorator {
|
public abstract class SimplerExecutorServiceDecorator extends ExecutorServiceDecorator {
|
||||||
|
|
||||||
private final ExecutorDecorator executorDecorator;
|
private final ExecutorDecorator executorDecorator;
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user