package org.warp.commonutils.batch; import java.util.concurrent.CompletionException; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import org.warp.commonutils.concurrency.executor.BoundedExecutorService; import org.warp.commonutils.functional.TriConsumer; import org.warp.commonutils.type.IntWrapper; import org.warp.commonutils.type.ShortNamedThreadFactory; import org.warp.commonutils.type.VariableWrapper; public class ParallelUtils { public static void parallelize(Consumer> iterator, int maxQueueSize, int parallelism, int groupSize, Consumer consumer) throws CompletionException { var parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, parallelism, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {} ); final int CHUNK_SIZE = groupSize; IntWrapper count = new IntWrapper(CHUNK_SIZE); VariableWrapper values = new VariableWrapper<>(new Object[CHUNK_SIZE]); AtomicReference firstExceptionReference = new AtomicReference<>(null); iterator.accept((value) -> { var firstException = firstExceptionReference.get(); if (firstException != null) { throw firstException; } values.var[CHUNK_SIZE - count.var] = value; count.var--; if (count.var == 0) { count.var = CHUNK_SIZE; Object[] valuesCopy = values.var; values.var = new Object[CHUNK_SIZE]; try { parallelExecutor.execute(() -> { for (int i = 0; i < CHUNK_SIZE; i++) { try { //noinspection unchecked consumer.accept((V) valuesCopy[i]); } catch (Exception ex) { firstExceptionReference.compareAndSet(null, new CompletionException(ex)); } } }); } catch (RejectedExecutionException e) { throw new CompletionException(e); } } }); parallelExecutor.shutdown(); try { parallelExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS); } catch (InterruptedException e) { throw new RuntimeException("Parallel forEach interrupted", e); } var firstException = firstExceptionReference.get(); if (firstException != null) { throw firstException; } } public static void parallelize(Consumer> iterator, int maxQueueSize, int parallelism, int groupSize, BiConsumer consumer) throws CompletionException { var parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, parallelism, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {} ); final int CHUNK_SIZE = groupSize; IntWrapper count = new IntWrapper(CHUNK_SIZE); VariableWrapper keys = new VariableWrapper<>(new Object[CHUNK_SIZE]); VariableWrapper values = new VariableWrapper<>(new Object[CHUNK_SIZE]); AtomicReference firstExceptionReference = new AtomicReference<>(null); iterator.accept((key, value) -> { var firstException = firstExceptionReference.get(); if (firstException != null) { throw firstException; } keys.var[CHUNK_SIZE - count.var] = key; values.var[CHUNK_SIZE - count.var] = value; count.var--; if (count.var == 0) { count.var = CHUNK_SIZE; Object[] keysCopy = keys.var; Object[] valuesCopy = values.var; keys.var = new Object[CHUNK_SIZE]; values.var = new Object[CHUNK_SIZE]; try { parallelExecutor.execute(() -> { for (int i = 0; i < CHUNK_SIZE; i++) { try { //noinspection unchecked consumer.accept((K) keysCopy[i], (V) valuesCopy[i]); } catch (Exception ex) { firstExceptionReference.compareAndSet(null, new CompletionException(ex)); break; } } }); } catch (RejectedExecutionException e) { throw new CompletionException(e); } } }); parallelExecutor.shutdown(); try { parallelExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS); } catch (InterruptedException e) { throw new RuntimeException("Parallel forEach interrupted", e); } var firstException = firstExceptionReference.get(); if (firstException != null) { throw firstException; } } public static void parallelize(Consumer> iterator, int maxQueueSize, int parallelism, int groupSize, TriConsumer consumer) throws CompletionException { var parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, parallelism, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {} ); final int CHUNK_SIZE = groupSize; IntWrapper count = new IntWrapper(CHUNK_SIZE); VariableWrapper keys1 = new VariableWrapper<>(new Object[CHUNK_SIZE]); VariableWrapper keys2 = new VariableWrapper<>(new Object[CHUNK_SIZE]); VariableWrapper values = new VariableWrapper<>(new Object[CHUNK_SIZE]); AtomicReference firstExceptionReference = new AtomicReference<>(null); iterator.accept((key1, key2, value) -> { var firstException = firstExceptionReference.get(); if (firstException != null) { throw firstException; } keys1.var[CHUNK_SIZE - count.var] = key1; keys2.var[CHUNK_SIZE - count.var] = key2; values.var[CHUNK_SIZE - count.var] = value; count.var--; if (count.var == 0) { count.var = CHUNK_SIZE; Object[] keys1Copy = keys1.var; Object[] keys2Copy = keys2.var; Object[] valuesCopy = values.var; keys1.var = new Object[CHUNK_SIZE]; keys2.var = new Object[CHUNK_SIZE]; values.var = new Object[CHUNK_SIZE]; try { parallelExecutor.execute(() -> { for (int i = 0; i < CHUNK_SIZE; i++) { try { //noinspection unchecked consumer.accept((K1) keys1Copy[i], (K2) keys2Copy[i], (V) valuesCopy[i]); } catch (Exception ex) { firstExceptionReference.compareAndSet(null, new CompletionException(ex)); } } }); } catch (RejectedExecutionException e) { throw new CompletionException(e); } } }); parallelExecutor.shutdown(); try { parallelExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS); } catch (InterruptedException e) { throw new RuntimeException("Parallel forEach interrupted", e); } var firstException = firstExceptionReference.get(); if (firstException != null) { throw firstException; } } }