package org.warp.commonutils.batch; import java.io.IOException; import java.util.concurrent.CompletionException; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Supplier; import org.warp.commonutils.concurrency.executor.BlockingOnFullQueueExecutorServiceDecorator; import org.warp.commonutils.concurrency.executor.BoundedExecutorService; import org.warp.commonutils.functional.CancellableBiConsumer; import org.warp.commonutils.functional.CancellableConsumer; import org.warp.commonutils.functional.CancellableTriConsumer; import org.warp.commonutils.functional.ConsumerResult; import org.warp.commonutils.functional.IOBiConsumer; import org.warp.commonutils.functional.IOConsumer; import org.warp.commonutils.functional.IOTriConsumer; import org.warp.commonutils.functional.TriConsumer; import org.warp.commonutils.type.IntWrapper; import org.warp.commonutils.type.ShortNamedThreadFactory; import org.warp.commonutils.type.VariableWrapper; public class ParallelUtils { public static void parallelizeIO(IOConsumer> iterator, int maxQueueSize, int parallelism, int groupSize, IOConsumer consumer) throws IOException { Consumer> action = (cons) -> { try { iterator.consume(cons::accept); } catch (IOException e) { throw new CompletionException(e); } }; try { parallelize(action, maxQueueSize, parallelism, groupSize, (v) -> { try { consumer.consume(v); } catch (IOException ex) { throw new CompletionException(ex); } }); } catch (CompletionException ex) { if (ex.getCause() instanceof CompletionException && ex.getCause().getCause() instanceof IOException) { throw (IOException) ex.getCause().getCause(); } else if (ex.getCause() instanceof IOException) { throw (IOException) ex.getCause(); } else { throw new IOException(ex); } } } public static void parallelize(Consumer> iterator, int maxQueueSize, int parallelism, int groupSize, Consumer consumer) throws CompletionException { var parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {} ); final int CHUNK_SIZE = groupSize; IntWrapper count = new IntWrapper(CHUNK_SIZE); VariableWrapper values = new VariableWrapper<>(new Object[CHUNK_SIZE]); AtomicReference firstExceptionReference = new AtomicReference<>(null); final Object arraysAccessLock = new Object(); iterator.accept((value) -> { synchronized (arraysAccessLock) { var firstException = firstExceptionReference.get(); if (firstException != null) { throw firstException; } values.var[CHUNK_SIZE - count.var] = value; count.var--; if (count.var == 0) { sendChunkItems(values, CHUNK_SIZE, count, consumer, parallelExecutor, firstExceptionReference); } } }); parallelExecutor.shutdown(); try { parallelExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS); } catch (InterruptedException e) { throw new RuntimeException("Parallel forEach interrupted", e); } synchronized (arraysAccessLock) { if (count.var > 0) { sendChunkItems(values, CHUNK_SIZE, count, consumer, null, firstExceptionReference); } } var firstException = firstExceptionReference.get(); if (firstException != null) { throw firstException; } } private static void sendChunkItems(VariableWrapper values, int CHUNK_SIZE, IntWrapper count, Consumer consumer, BlockingOnFullQueueExecutorServiceDecorator parallelExecutor, AtomicReference firstExceptionReference) { var itemsCount = CHUNK_SIZE - count.var; count.var = CHUNK_SIZE; Object[] valuesCopy = values.var; values.var = new Object[itemsCount]; try { Runnable action = () -> { for (int i = 0; i < itemsCount; i++) { try { //noinspection unchecked consumer.accept((V) valuesCopy[i]); } catch (Exception ex) { firstExceptionReference.compareAndSet(null, new CompletionException(ex)); } } }; if (parallelExecutor != null) { parallelExecutor.execute(action); } else { action.run(); } } catch (RejectedExecutionException e) { throw new CompletionException(e); } } public static ConsumerResult parallelize(Consumer> iterator, int maxQueueSize, int parallelism, int groupSize, CancellableConsumer consumer) throws CompletionException { if (parallelism <= 1) { iterator.accept(consumer); return ConsumerResult.result(); } else { var parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {} ); final int CHUNK_SIZE = groupSize; IntWrapper count = new IntWrapper(CHUNK_SIZE); VariableWrapper keys = new VariableWrapper<>(new Object[CHUNK_SIZE]); AtomicReference firstExceptionReference = new AtomicReference<>(null); AtomicBoolean cancelled = new AtomicBoolean(false); final Object arraysAccessLock = new Object(); iterator.accept((key) -> { synchronized (arraysAccessLock) { var firstException = firstExceptionReference.get(); if (firstException != null) { throw firstException; } var cancelledVal = cancelled.get(); if (cancelledVal) { return ConsumerResult.cancelNext(); } keys.var[CHUNK_SIZE - count.var] = key; count.var--; if (count.var == 0) { return sendChunkItems(keys, CHUNK_SIZE, count, consumer, parallelExecutor, firstExceptionReference, cancelled ); } else { return ConsumerResult.result(); } } }); parallelExecutor.shutdown(); try { parallelExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS); } catch (InterruptedException e) { throw new RuntimeException("Parallel forEach interrupted", e); } synchronized (arraysAccessLock) { if (count.var > 0) { var sendChunkItemsResult = sendChunkItems(keys, CHUNK_SIZE, count, consumer, null, firstExceptionReference, cancelled ); cancelled.compareAndSet(false, sendChunkItemsResult.isCancelled()); } } var firstException = firstExceptionReference.get(); if (firstException != null) { throw firstException; } if (cancelled.get()) { return ConsumerResult.cancelNext(); } else { return ConsumerResult.result(); } } } private static ConsumerResult sendChunkItems(VariableWrapper keys, final int CHUNK_SIZE, IntWrapper count, CancellableConsumer consumer, BlockingOnFullQueueExecutorServiceDecorator parallelExecutor, AtomicReference firstExceptionReference, AtomicBoolean cancelled) { int itemsCount = CHUNK_SIZE - count.var; count.var = CHUNK_SIZE; Object[] keysCopy = keys.var; keys.var = new Object[itemsCount]; try { Supplier action = () -> { for (int i = 0; i < itemsCount; i++) { try { //noinspection unchecked if (consumer.acceptCancellable((K) keysCopy[i]).isCancelled()) { cancelled.set(true); return ConsumerResult.cancelNext(); } } catch (Exception ex) { firstExceptionReference.compareAndSet(null, new CompletionException(ex)); return ConsumerResult.cancelNext(); } } return ConsumerResult.result(); }; if (parallelExecutor != null) { parallelExecutor.execute(action::get); return ConsumerResult.result(); } else { return action.get(); } } catch (RejectedExecutionException e) { throw new CompletionException(e); } } public static void parallelizeIO(IOConsumer> iterator, int maxQueueSize, int parallelism, int groupSize, IOBiConsumer consumer) throws IOException { Consumer> action = (cons) -> { try { iterator.consume(cons::accept); } catch (IOException e) { throw new CompletionException(e); } }; try { parallelize(action, maxQueueSize, parallelism, groupSize, (k, v) -> { try { consumer.consume(k, v); } catch (IOException ex) { throw new CompletionException(ex); } }); } catch (CompletionException ex) { if (ex.getCause() instanceof CompletionException && ex.getCause().getCause() instanceof IOException) { throw (IOException) ex.getCause().getCause(); } else if (ex.getCause() instanceof IOException) { throw (IOException) ex.getCause(); } else { throw new IOException(ex); } } } public static void parallelize(Consumer> iterator, int maxQueueSize, int parallelism, int groupSize, BiConsumer consumer) throws CompletionException { if (parallelism <= 1) { iterator.accept(consumer); } else { var parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {} ); final int CHUNK_SIZE = groupSize; IntWrapper count = new IntWrapper(CHUNK_SIZE); VariableWrapper keys = new VariableWrapper<>(new Object[CHUNK_SIZE]); VariableWrapper values = new VariableWrapper<>(new Object[CHUNK_SIZE]); AtomicReference firstExceptionReference = new AtomicReference<>(null); final Object arraysAccessLock = new Object(); iterator.accept((key, value) -> { synchronized (arraysAccessLock) { var firstException = firstExceptionReference.get(); if (firstException != null) { throw firstException; } keys.var[CHUNK_SIZE - count.var] = key; values.var[CHUNK_SIZE - count.var] = value; count.var--; if (count.var == 0) { sendChunkItems(keys, values, CHUNK_SIZE, count, consumer, parallelExecutor, firstExceptionReference); } } }); parallelExecutor.shutdown(); try { parallelExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS); } catch (InterruptedException e) { throw new RuntimeException("Parallel forEach interrupted", e); } synchronized (arraysAccessLock) { if (count.var > 0) { sendChunkItems(keys, values, CHUNK_SIZE, count, consumer, null, firstExceptionReference); } } var firstException = firstExceptionReference.get(); if (firstException != null) { throw firstException; } } } private static void sendChunkItems(VariableWrapper keys, VariableWrapper values, final int CHUNK_SIZE, IntWrapper count, BiConsumer consumer, BlockingOnFullQueueExecutorServiceDecorator parallelExecutor, AtomicReference firstExceptionReference) { int itemsCount = CHUNK_SIZE - count.var; count.var = CHUNK_SIZE; Object[] keysCopy = keys.var; Object[] valuesCopy = values.var; keys.var = new Object[itemsCount]; values.var = new Object[itemsCount]; try { Runnable action = () -> { for (int i = 0; i < itemsCount; i++) { try { //noinspection unchecked consumer.accept((K) keysCopy[i], (V) valuesCopy[i]); } catch (Exception ex) { firstExceptionReference.compareAndSet(null, new CompletionException(ex)); break; } } }; if (parallelExecutor != null) { parallelExecutor.execute(action); } else { action.run(); } } catch (RejectedExecutionException e) { throw new CompletionException(e); } } public static ConsumerResult parallelize(Consumer> iterator, int maxQueueSize, int parallelism, int groupSize, CancellableBiConsumer consumer) throws CompletionException { if (parallelism <= 1) { iterator.accept(consumer); return ConsumerResult.result(); } else { var parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {} ); final int CHUNK_SIZE = groupSize; IntWrapper count = new IntWrapper(CHUNK_SIZE); VariableWrapper keys = new VariableWrapper<>(new Object[CHUNK_SIZE]); VariableWrapper values = new VariableWrapper<>(new Object[CHUNK_SIZE]); AtomicReference firstExceptionReference = new AtomicReference<>(null); AtomicBoolean cancelled = new AtomicBoolean(false); final Object arraysAccessLock = new Object(); iterator.accept((key, value) -> { synchronized (arraysAccessLock) { var firstException = firstExceptionReference.get(); if (firstException != null) { throw firstException; } var cancelledVal = cancelled.get(); if (cancelledVal) { return ConsumerResult.cancelNext(); } keys.var[CHUNK_SIZE - count.var] = key; values.var[CHUNK_SIZE - count.var] = value; count.var--; if (count.var == 0) { return sendChunkItems(keys, values, CHUNK_SIZE, count, consumer, parallelExecutor, firstExceptionReference, cancelled ); } else { return ConsumerResult.result(); } } }); parallelExecutor.shutdown(); try { parallelExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS); } catch (InterruptedException e) { throw new RuntimeException("Parallel forEach interrupted", e); } synchronized (arraysAccessLock) { if (count.var > 0) { var sendChunkItemsResult = sendChunkItems(keys, values, CHUNK_SIZE, count, consumer, null, firstExceptionReference, cancelled ); cancelled.compareAndSet(false, sendChunkItemsResult.isCancelled()); } } var firstException = firstExceptionReference.get(); if (firstException != null) { throw firstException; } if (cancelled.get()) { return ConsumerResult.cancelNext(); } else { return ConsumerResult.result(); } } } private static ConsumerResult sendChunkItems(VariableWrapper keys, VariableWrapper values, final int CHUNK_SIZE, IntWrapper count, CancellableBiConsumer consumer, BlockingOnFullQueueExecutorServiceDecorator parallelExecutor, AtomicReference firstExceptionReference, AtomicBoolean cancelled) { int itemsCount = CHUNK_SIZE - count.var; count.var = CHUNK_SIZE; Object[] keysCopy = keys.var; Object[] valuesCopy = values.var; keys.var = new Object[itemsCount]; values.var = new Object[itemsCount]; try { Supplier action = () -> { for (int i = 0; i < itemsCount; i++) { try { //noinspection unchecked if (consumer.acceptCancellable((K) keysCopy[i], (V) valuesCopy[i]).isCancelled()) { cancelled.set(true); return ConsumerResult.cancelNext(); } } catch (Exception ex) { firstExceptionReference.compareAndSet(null, new CompletionException(ex)); return ConsumerResult.cancelNext(); } } return ConsumerResult.result(); }; if (parallelExecutor != null) { parallelExecutor.execute(action::get); return ConsumerResult.result(); } else { return action.get(); } } catch (RejectedExecutionException e) { throw new CompletionException(e); } } public static void parallelizeIO(IOConsumer> iterator, int maxQueueSize, int parallelism, int groupSize, IOTriConsumer consumer) throws IOException { Consumer> action = (cons) -> { try { iterator.consume(cons::accept); } catch (IOException e) { throw new CompletionException(e); } }; try { parallelize(action, maxQueueSize, parallelism, groupSize, (k1, k2, v) -> { try { consumer.accept(k1, k2, v); } catch (IOException ex) { throw new CompletionException(ex); } }); } catch (CompletionException ex) { if (ex.getCause() instanceof CompletionException && ex.getCause().getCause() instanceof IOException) { throw (IOException) ex.getCause().getCause(); } else if (ex.getCause() instanceof IOException) { throw (IOException) ex.getCause(); } else { throw new IOException(ex); } } } public static void parallelize(Consumer> iterator, int maxQueueSize, int parallelism, int groupSize, TriConsumer consumer) throws CompletionException { var parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {} ); final int CHUNK_SIZE = groupSize; IntWrapper count = new IntWrapper(CHUNK_SIZE); VariableWrapper keys1 = new VariableWrapper<>(new Object[CHUNK_SIZE]); VariableWrapper keys2 = new VariableWrapper<>(new Object[CHUNK_SIZE]); VariableWrapper values = new VariableWrapper<>(new Object[CHUNK_SIZE]); AtomicReference firstExceptionReference = new AtomicReference<>(null); final Object arraysAccessLock = new Object(); iterator.accept((key1, key2, value) -> { synchronized (arraysAccessLock) { var firstException = firstExceptionReference.get(); if (firstException != null) { throw firstException; } keys1.var[CHUNK_SIZE - count.var] = key1; keys2.var[CHUNK_SIZE - count.var] = key2; values.var[CHUNK_SIZE - count.var] = value; count.var--; if (count.var == 0) { sendChunkItems(keys1, keys2, values, CHUNK_SIZE, count, consumer, parallelExecutor, firstExceptionReference); } } }); parallelExecutor.shutdown(); try { parallelExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS); } catch (InterruptedException e) { throw new RuntimeException("Parallel forEach interrupted", e); } synchronized (arraysAccessLock) { if (count.var > 0) { sendChunkItems(keys1, keys2, values, CHUNK_SIZE, count, consumer, null, firstExceptionReference); } } var firstException = firstExceptionReference.get(); if (firstException != null) { throw firstException; } } private static void sendChunkItems(VariableWrapper keys1, VariableWrapper keys2, VariableWrapper values, int CHUNK_SIZE, IntWrapper count, TriConsumer consumer, BlockingOnFullQueueExecutorServiceDecorator parallelExecutor, AtomicReference firstExceptionReference) { int itemsCount = CHUNK_SIZE - count.var; count.var = CHUNK_SIZE; Object[] keys1Copy = keys1.var; Object[] keys2Copy = keys2.var; Object[] valuesCopy = values.var; keys1.var = new Object[itemsCount]; keys2.var = new Object[itemsCount]; values.var = new Object[itemsCount]; try { Runnable action = () -> { for (int i = 0; i < itemsCount; i++) { try { //noinspection unchecked consumer.accept((K1) keys1Copy[i], (K2) keys2Copy[i], (V) valuesCopy[i]); } catch (Exception ex) { firstExceptionReference.compareAndSet(null, new CompletionException(ex)); } } }; if (parallelExecutor != null) { parallelExecutor.execute(action); } else { action.run(); } } catch (RejectedExecutionException e) { throw new CompletionException(e); } } public static ConsumerResult parallelize(Consumer> iterator, int maxQueueSize, int parallelism, int groupSize, CancellableTriConsumer consumer) throws CompletionException { if (parallelism <= 1) { iterator.accept(consumer); return ConsumerResult.result(); } else { var parallelExecutor = BoundedExecutorService.create(maxQueueSize, parallelism, 0, TimeUnit.MILLISECONDS, new ShortNamedThreadFactory("ForEachParallel"), (a, b) -> {} ); final int CHUNK_SIZE = groupSize; IntWrapper count = new IntWrapper(CHUNK_SIZE); VariableWrapper keys1 = new VariableWrapper<>(new Object[CHUNK_SIZE]); VariableWrapper keys2 = new VariableWrapper<>(new Object[CHUNK_SIZE]); VariableWrapper values = new VariableWrapper<>(new Object[CHUNK_SIZE]); AtomicReference firstExceptionReference = new AtomicReference<>(null); AtomicBoolean cancelled = new AtomicBoolean(false); final Object arraysAccessLock = new Object(); iterator.accept((key1, key2, value) -> { synchronized (arraysAccessLock) { var firstException = firstExceptionReference.get(); if (firstException != null) { throw firstException; } var cancelledVal = cancelled.get(); if (cancelledVal) { return ConsumerResult.cancelNext(); } keys1.var[CHUNK_SIZE - count.var] = key1; keys2.var[CHUNK_SIZE - count.var] = key2; values.var[CHUNK_SIZE - count.var] = value; count.var--; if (count.var == 0) { return sendChunkItems(keys1, keys2, values, CHUNK_SIZE, count, consumer, parallelExecutor, firstExceptionReference, cancelled ); } else { return ConsumerResult.result(); } } }); parallelExecutor.shutdown(); try { parallelExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS); } catch (InterruptedException e) { throw new RuntimeException("Parallel forEach interrupted", e); } synchronized (arraysAccessLock) { if (count.var > 0) { var sendChunkItemsResult = sendChunkItems(keys1, keys2, values, CHUNK_SIZE, count, consumer, null, firstExceptionReference, cancelled ); cancelled.compareAndSet(false, sendChunkItemsResult.isCancelled()); } } var firstException = firstExceptionReference.get(); if (firstException != null) { throw firstException; } if (cancelled.get()) { return ConsumerResult.cancelNext(); } else { return ConsumerResult.result(); } } } private static ConsumerResult sendChunkItems(VariableWrapper keys1, VariableWrapper keys2, VariableWrapper values, final int CHUNK_SIZE, IntWrapper count, CancellableTriConsumer consumer, BlockingOnFullQueueExecutorServiceDecorator parallelExecutor, AtomicReference firstExceptionReference, AtomicBoolean cancelled) { int itemsCount = CHUNK_SIZE - count.var; count.var = CHUNK_SIZE; Object[] keys1Copy = keys1.var; Object[] keys2Copy = keys2.var; Object[] valuesCopy = values.var; keys1.var = new Object[itemsCount]; keys2.var = new Object[itemsCount]; values.var = new Object[itemsCount]; try { Supplier action = () -> { for (int i = 0; i < itemsCount; i++) { try { //noinspection unchecked if (consumer.acceptCancellable((K1) keys1Copy[i], (K2) keys2Copy[i], (V) valuesCopy[i]).isCancelled()) { cancelled.set(true); return ConsumerResult.cancelNext(); } } catch (Exception ex) { firstExceptionReference.compareAndSet(null, new CompletionException(ex)); return ConsumerResult.cancelNext(); } } return ConsumerResult.result(); }; if (parallelExecutor != null) { parallelExecutor.execute(action::get); return ConsumerResult.result(); } else { return action.get(); } } catch (RejectedExecutionException e) { throw new CompletionException(e); } } }