diff --git a/pom.xml b/pom.xml index 0c6fd12..de0ab08 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ common-utils org.warp - 1.0.4 + 1.0.5 UTF-8 diff --git a/src/main/java/org/warp/commonutils/batch/ParallelUtils.java b/src/main/java/org/warp/commonutils/batch/ParallelUtils.java index 74a91f8..6570988 100644 --- a/src/main/java/org/warp/commonutils/batch/ParallelUtils.java +++ b/src/main/java/org/warp/commonutils/batch/ParallelUtils.java @@ -3,6 +3,7 @@ package org.warp.commonutils.batch; import java.util.concurrent.CompletionException; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import org.warp.commonutils.concurrency.executor.BoundedExecutorService; @@ -13,10 +14,10 @@ import org.warp.commonutils.type.VariableWrapper; public class ParallelUtils { - public static void parallelize(Consumer> iterator, + public static void parallelize(Consumer> iterator, int maxQueueSize, int parallelism, - int groupSize, BiConsumer consumer) { + int groupSize, Consumer consumer) throws CompletionException { var parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, parallelism, @@ -27,23 +28,29 @@ public class ParallelUtils { ); final int CHUNK_SIZE = groupSize; IntWrapper count = new IntWrapper(CHUNK_SIZE); - VariableWrapper keys = new VariableWrapper<>(new Object[CHUNK_SIZE]); VariableWrapper values = new VariableWrapper<>(new Object[CHUNK_SIZE]); - iterator.accept((key, value) -> { - keys.var[CHUNK_SIZE - count.var] = key; + AtomicReference firstExceptionReference = new AtomicReference<>(null); + iterator.accept((value) -> { + var firstException = firstExceptionReference.get(); + if (firstException != null) { + throw firstException; + } + values.var[CHUNK_SIZE - count.var] = value; count.var--; if (count.var == 0) { count.var = CHUNK_SIZE; - Object[] keysCopy = keys.var; Object[] valuesCopy = values.var; - keys.var = new Object[CHUNK_SIZE]; values.var = new Object[CHUNK_SIZE]; try { parallelExecutor.execute(() -> { for (int i = 0; i < CHUNK_SIZE; i++) { - //noinspection unchecked - consumer.accept((K) keysCopy[i], (V) valuesCopy[i]); + try { + //noinspection unchecked + consumer.accept((V) valuesCopy[i]); + } catch (Exception ex) { + firstExceptionReference.compareAndSet(null, new CompletionException(ex)); + } } }); } catch (RejectedExecutionException e) { @@ -57,13 +64,79 @@ public class ParallelUtils { } catch (InterruptedException e) { throw new RuntimeException("Parallel forEach interrupted", e); } + + var firstException = firstExceptionReference.get(); + if (firstException != null) { + throw firstException; + } + } + + public static void parallelize(Consumer> iterator, + int maxQueueSize, + int parallelism, + int groupSize, BiConsumer consumer) throws CompletionException { + var parallelExecutor = BoundedExecutorService.create(maxQueueSize, + parallelism, + parallelism, + 0, + TimeUnit.MILLISECONDS, + new ShortNamedThreadFactory("ForEachParallel"), + (a, b) -> {} + ); + final int CHUNK_SIZE = groupSize; + IntWrapper count = new IntWrapper(CHUNK_SIZE); + VariableWrapper keys = new VariableWrapper<>(new Object[CHUNK_SIZE]); + VariableWrapper values = new VariableWrapper<>(new Object[CHUNK_SIZE]); + AtomicReference firstExceptionReference = new AtomicReference<>(null); + iterator.accept((key, value) -> { + var firstException = firstExceptionReference.get(); + if (firstException != null) { + throw firstException; + } + + keys.var[CHUNK_SIZE - count.var] = key; + values.var[CHUNK_SIZE - count.var] = value; + count.var--; + if (count.var == 0) { + count.var = CHUNK_SIZE; + Object[] keysCopy = keys.var; + Object[] valuesCopy = values.var; + keys.var = new Object[CHUNK_SIZE]; + values.var = new Object[CHUNK_SIZE]; + try { + parallelExecutor.execute(() -> { + for (int i = 0; i < CHUNK_SIZE; i++) { + try { + //noinspection unchecked + consumer.accept((K) keysCopy[i], (V) valuesCopy[i]); + } catch (Exception ex) { + firstExceptionReference.compareAndSet(null, new CompletionException(ex)); + } + } + }); + } catch (RejectedExecutionException e) { + throw new CompletionException(e); + } + } + }); + parallelExecutor.shutdown(); + try { + parallelExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS); + } catch (InterruptedException e) { + throw new RuntimeException("Parallel forEach interrupted", e); + } + + var firstException = firstExceptionReference.get(); + if (firstException != null) { + throw firstException; + } } public static void parallelize(Consumer> iterator, int maxQueueSize, int parallelism, int groupSize, - TriConsumer consumer) { + TriConsumer consumer) throws CompletionException { var parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, parallelism, @@ -77,7 +150,13 @@ public class ParallelUtils { VariableWrapper keys1 = new VariableWrapper<>(new Object[CHUNK_SIZE]); VariableWrapper keys2 = new VariableWrapper<>(new Object[CHUNK_SIZE]); VariableWrapper values = new VariableWrapper<>(new Object[CHUNK_SIZE]); + AtomicReference firstExceptionReference = new AtomicReference<>(null); iterator.accept((key1, key2, value) -> { + var firstException = firstExceptionReference.get(); + if (firstException != null) { + throw firstException; + } + keys1.var[CHUNK_SIZE - count.var] = key1; keys2.var[CHUNK_SIZE - count.var] = key2; values.var[CHUNK_SIZE - count.var] = value; @@ -93,8 +172,12 @@ public class ParallelUtils { try { parallelExecutor.execute(() -> { for (int i = 0; i < CHUNK_SIZE; i++) { - //noinspection unchecked - consumer.accept((K1) keys1Copy[i], (K2) keys2Copy[i], (V) valuesCopy[i]); + try { + //noinspection unchecked + consumer.accept((K1) keys1Copy[i], (K2) keys2Copy[i], (V) valuesCopy[i]); + } catch (Exception ex) { + firstExceptionReference.compareAndSet(null, new CompletionException(ex)); + } } }); } catch (RejectedExecutionException e) { @@ -108,5 +191,10 @@ public class ParallelUtils { } catch (InterruptedException e) { throw new RuntimeException("Parallel forEach interrupted", e); } + + var firstException = firstExceptionReference.get(); + if (firstException != null) { + throw firstException; + } } } diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/AsyncStackTraceExecutorServiceDecorator.java b/src/main/java/org/warp/commonutils/concurrency/executor/AsyncStackTraceExecutorServiceDecorator.java new file mode 100644 index 0000000..5fac5a2 --- /dev/null +++ b/src/main/java/org/warp/commonutils/concurrency/executor/AsyncStackTraceExecutorServiceDecorator.java @@ -0,0 +1,26 @@ +package org.warp.commonutils.concurrency.executor; + +import java.util.concurrent.ExecutorService; +import org.jetbrains.annotations.NotNull; + +public class AsyncStackTraceExecutorServiceDecorator extends SimplerExecutorServiceDecorator { + + public AsyncStackTraceExecutorServiceDecorator(ExecutorService executorService) { + super(executorService, (executor) -> { + // Do nothing if it has already the asyncstacktrace executor service decorator + if (executorService instanceof ExecutorServiceDecorator) { + var decorators = ((ExecutorServiceDecorator) executorService).getExecutorServiceDecorators(); + if (decorators.contains(AsyncStackTraceExecutorServiceDecorator.class)) { + return new ExecutorDecorator(executorService) { + @Override + public void execute(@NotNull Runnable runnable) { + super.execute(runnable); + } + }; + } + } + + return new AsyncStackTraceExecutorDecorator(executor); + }); + } +} \ No newline at end of file diff --git a/src/main/java/org/warp/commonutils/concurrency/executor/SimplerExecutorServiceDecorator.java b/src/main/java/org/warp/commonutils/concurrency/executor/SimplerExecutorServiceDecorator.java index 1f42fe3..58e7f18 100644 --- a/src/main/java/org/warp/commonutils/concurrency/executor/SimplerExecutorServiceDecorator.java +++ b/src/main/java/org/warp/commonutils/concurrency/executor/SimplerExecutorServiceDecorator.java @@ -12,7 +12,7 @@ import java.util.concurrent.TimeoutException; import java.util.function.Function; import org.jetbrains.annotations.NotNull; -public class SimplerExecutorServiceDecorator extends ExecutorServiceDecorator { +public abstract class SimplerExecutorServiceDecorator extends ExecutorServiceDecorator { private final ExecutorDecorator executorDecorator;