diff --git a/src/main/java/org/warp/commonutils/batch/ParallelUtils.java b/src/main/java/org/warp/commonutils/batch/ParallelUtils.java index d267c2f..093b7ff 100644 --- a/src/main/java/org/warp/commonutils/batch/ParallelUtils.java +++ b/src/main/java/org/warp/commonutils/batch/ParallelUtils.java @@ -4,11 +4,17 @@ import java.io.IOException; import java.util.concurrent.CompletionException; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Supplier; import org.warp.commonutils.concurrency.executor.BlockingOnFullQueueExecutorServiceDecorator; import org.warp.commonutils.concurrency.executor.BoundedExecutorService; +import org.warp.commonutils.functional.CancellableBiConsumer; +import org.warp.commonutils.functional.CancellableConsumer; +import org.warp.commonutils.functional.CancellableTriConsumer; +import org.warp.commonutils.functional.ConsumerResult; import org.warp.commonutils.functional.IOBiConsumer; import org.warp.commonutils.functional.IOConsumer; import org.warp.commonutils.functional.IOTriConsumer; @@ -130,6 +136,126 @@ public class ParallelUtils { } } + public static ConsumerResult parallelize(Consumer> iterator, + int maxQueueSize, + int parallelism, + int groupSize, + CancellableConsumer consumer) throws CompletionException { + if (parallelism <= 1) { + iterator.accept(consumer); + return ConsumerResult.result(); + } else { + var parallelExecutor = BoundedExecutorService.create(maxQueueSize, + parallelism, + 0, + TimeUnit.MILLISECONDS, + new ShortNamedThreadFactory("ForEachParallel"), + (a, b) -> {} + ); + final int CHUNK_SIZE = groupSize; + IntWrapper count = new IntWrapper(CHUNK_SIZE); + VariableWrapper keys = new VariableWrapper<>(new Object[CHUNK_SIZE]); + AtomicReference firstExceptionReference = new AtomicReference<>(null); + AtomicBoolean cancelled = new AtomicBoolean(false); + final Object arraysAccessLock = new Object(); + iterator.accept((key) -> { + synchronized (arraysAccessLock) { + var firstException = firstExceptionReference.get(); + if (firstException != null) { + throw firstException; + } + var cancelledVal = cancelled.get(); + if (cancelledVal) { + return ConsumerResult.cancelNext(); + } + + keys.var[CHUNK_SIZE - count.var] = key; + count.var--; + + if (count.var == 0) { + return sendChunkItems(keys, + CHUNK_SIZE, + count, + consumer, + parallelExecutor, + firstExceptionReference, + cancelled + ); + } else { + return ConsumerResult.result(); + } + } + }); + parallelExecutor.shutdown(); + try { + parallelExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS); + } catch (InterruptedException e) { + throw new RuntimeException("Parallel forEach interrupted", e); + } + synchronized (arraysAccessLock) { + if (count.var > 0) { + var sendChunkItemsResult = sendChunkItems(keys, + CHUNK_SIZE, + count, + consumer, + null, + firstExceptionReference, + cancelled + ); + cancelled.compareAndSet(false, sendChunkItemsResult.isCancelled()); + } + } + + var firstException = firstExceptionReference.get(); + if (firstException != null) { + throw firstException; + } + if (cancelled.get()) { + return ConsumerResult.cancelNext(); + } else { + return ConsumerResult.result(); + } + } + } + + private static ConsumerResult sendChunkItems(VariableWrapper keys, + final int CHUNK_SIZE, + IntWrapper count, + CancellableConsumer consumer, + BlockingOnFullQueueExecutorServiceDecorator parallelExecutor, + AtomicReference firstExceptionReference, + AtomicBoolean cancelled) { + int itemsCount = CHUNK_SIZE - count.var; + count.var = CHUNK_SIZE; + Object[] keysCopy = keys.var; + keys.var = new Object[itemsCount]; + try { + Supplier action = () -> { + for (int i = 0; i < itemsCount; i++) { + try { + //noinspection unchecked + if (consumer.acceptCancellable((K) keysCopy[i]).isCancelled()) { + cancelled.set(true); + return ConsumerResult.cancelNext(); + } + } catch (Exception ex) { + firstExceptionReference.compareAndSet(null, new CompletionException(ex)); + return ConsumerResult.cancelNext(); + } + } + return ConsumerResult.result(); + }; + if (parallelExecutor != null) { + parallelExecutor.execute(action::get); + return ConsumerResult.result(); + } else { + return action.get(); + } + } catch (RejectedExecutionException e) { + throw new CompletionException(e); + } + } + public static void parallelizeIO(IOConsumer> iterator, int maxQueueSize, int parallelism, @@ -252,6 +378,133 @@ public class ParallelUtils { } } + public static ConsumerResult parallelize(Consumer> iterator, + int maxQueueSize, + int parallelism, + int groupSize, + CancellableBiConsumer consumer) throws CompletionException { + if (parallelism <= 1) { + iterator.accept(consumer); + return ConsumerResult.result(); + } else { + var parallelExecutor = BoundedExecutorService.create(maxQueueSize, + parallelism, + 0, + TimeUnit.MILLISECONDS, + new ShortNamedThreadFactory("ForEachParallel"), + (a, b) -> {} + ); + final int CHUNK_SIZE = groupSize; + IntWrapper count = new IntWrapper(CHUNK_SIZE); + VariableWrapper keys = new VariableWrapper<>(new Object[CHUNK_SIZE]); + VariableWrapper values = new VariableWrapper<>(new Object[CHUNK_SIZE]); + AtomicReference firstExceptionReference = new AtomicReference<>(null); + AtomicBoolean cancelled = new AtomicBoolean(false); + final Object arraysAccessLock = new Object(); + iterator.accept((key, value) -> { + synchronized (arraysAccessLock) { + var firstException = firstExceptionReference.get(); + if (firstException != null) { + throw firstException; + } + var cancelledVal = cancelled.get(); + if (cancelledVal) { + return ConsumerResult.cancelNext(); + } + + keys.var[CHUNK_SIZE - count.var] = key; + values.var[CHUNK_SIZE - count.var] = value; + count.var--; + + if (count.var == 0) { + return sendChunkItems(keys, + values, + CHUNK_SIZE, + count, + consumer, + parallelExecutor, + firstExceptionReference, + cancelled + ); + } else { + return ConsumerResult.result(); + } + } + }); + parallelExecutor.shutdown(); + try { + parallelExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS); + } catch (InterruptedException e) { + throw new RuntimeException("Parallel forEach interrupted", e); + } + synchronized (arraysAccessLock) { + if (count.var > 0) { + var sendChunkItemsResult = sendChunkItems(keys, + values, + CHUNK_SIZE, + count, + consumer, + null, + firstExceptionReference, + cancelled + ); + cancelled.compareAndSet(false, sendChunkItemsResult.isCancelled()); + } + } + + var firstException = firstExceptionReference.get(); + if (firstException != null) { + throw firstException; + } + if (cancelled.get()) { + return ConsumerResult.cancelNext(); + } else { + return ConsumerResult.result(); + } + } + } + + private static ConsumerResult sendChunkItems(VariableWrapper keys, + VariableWrapper values, + final int CHUNK_SIZE, + IntWrapper count, + CancellableBiConsumer consumer, + BlockingOnFullQueueExecutorServiceDecorator parallelExecutor, + AtomicReference firstExceptionReference, + AtomicBoolean cancelled) { + int itemsCount = CHUNK_SIZE - count.var; + count.var = CHUNK_SIZE; + Object[] keysCopy = keys.var; + Object[] valuesCopy = values.var; + keys.var = new Object[itemsCount]; + values.var = new Object[itemsCount]; + try { + Supplier action = () -> { + for (int i = 0; i < itemsCount; i++) { + try { + //noinspection unchecked + if (consumer.acceptCancellable((K) keysCopy[i], (V) valuesCopy[i]).isCancelled()) { + cancelled.set(true); + return ConsumerResult.cancelNext(); + } + } catch (Exception ex) { + firstExceptionReference.compareAndSet(null, new CompletionException(ex)); + return ConsumerResult.cancelNext(); + } + } + return ConsumerResult.result(); + }; + if (parallelExecutor != null) { + parallelExecutor.execute(action::get); + return ConsumerResult.result(); + } else { + return action.get(); + } + } catch (RejectedExecutionException e) { + throw new CompletionException(e); + } + } + public static void parallelizeIO(IOConsumer> iterator, int maxQueueSize, int parallelism, @@ -373,4 +626,138 @@ public class ParallelUtils { throw new CompletionException(e); } } + + public static ConsumerResult parallelize(Consumer> iterator, + int maxQueueSize, + int parallelism, + int groupSize, + CancellableTriConsumer consumer) throws CompletionException { + if (parallelism <= 1) { + iterator.accept(consumer); + return ConsumerResult.result(); + } else { + var parallelExecutor = BoundedExecutorService.create(maxQueueSize, + parallelism, + 0, + TimeUnit.MILLISECONDS, + new ShortNamedThreadFactory("ForEachParallel"), + (a, b) -> {} + ); + final int CHUNK_SIZE = groupSize; + IntWrapper count = new IntWrapper(CHUNK_SIZE); + VariableWrapper keys1 = new VariableWrapper<>(new Object[CHUNK_SIZE]); + VariableWrapper keys2 = new VariableWrapper<>(new Object[CHUNK_SIZE]); + VariableWrapper values = new VariableWrapper<>(new Object[CHUNK_SIZE]); + AtomicReference firstExceptionReference = new AtomicReference<>(null); + AtomicBoolean cancelled = new AtomicBoolean(false); + final Object arraysAccessLock = new Object(); + iterator.accept((key1, key2, value) -> { + synchronized (arraysAccessLock) { + var firstException = firstExceptionReference.get(); + if (firstException != null) { + throw firstException; + } + var cancelledVal = cancelled.get(); + if (cancelledVal) { + return ConsumerResult.cancelNext(); + } + + keys1.var[CHUNK_SIZE - count.var] = key1; + keys2.var[CHUNK_SIZE - count.var] = key2; + values.var[CHUNK_SIZE - count.var] = value; + count.var--; + + if (count.var == 0) { + return sendChunkItems(keys1, + keys2, + values, + CHUNK_SIZE, + count, + consumer, + parallelExecutor, + firstExceptionReference, + cancelled + ); + } else { + return ConsumerResult.result(); + } + } + }); + parallelExecutor.shutdown(); + try { + parallelExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS); + } catch (InterruptedException e) { + throw new RuntimeException("Parallel forEach interrupted", e); + } + synchronized (arraysAccessLock) { + if (count.var > 0) { + var sendChunkItemsResult = sendChunkItems(keys1, + keys2, + values, + CHUNK_SIZE, + count, + consumer, + null, + firstExceptionReference, + cancelled + ); + cancelled.compareAndSet(false, sendChunkItemsResult.isCancelled()); + } + } + + var firstException = firstExceptionReference.get(); + if (firstException != null) { + throw firstException; + } + if (cancelled.get()) { + return ConsumerResult.cancelNext(); + } else { + return ConsumerResult.result(); + } + } + } + + private static ConsumerResult sendChunkItems(VariableWrapper keys1, + VariableWrapper keys2, + VariableWrapper values, + final int CHUNK_SIZE, + IntWrapper count, + CancellableTriConsumer consumer, + BlockingOnFullQueueExecutorServiceDecorator parallelExecutor, + AtomicReference firstExceptionReference, + AtomicBoolean cancelled) { + int itemsCount = CHUNK_SIZE - count.var; + count.var = CHUNK_SIZE; + Object[] keys1Copy = keys1.var; + Object[] keys2Copy = keys2.var; + Object[] valuesCopy = values.var; + keys1.var = new Object[itemsCount]; + keys2.var = new Object[itemsCount]; + values.var = new Object[itemsCount]; + try { + Supplier action = () -> { + for (int i = 0; i < itemsCount; i++) { + try { + //noinspection unchecked + if (consumer.acceptCancellable((K1) keys1Copy[i], (K2) keys2Copy[i], (V) valuesCopy[i]).isCancelled()) { + cancelled.set(true); + return ConsumerResult.cancelNext(); + } + } catch (Exception ex) { + firstExceptionReference.compareAndSet(null, new CompletionException(ex)); + return ConsumerResult.cancelNext(); + } + } + return ConsumerResult.result(); + }; + if (parallelExecutor != null) { + parallelExecutor.execute(action::get); + return ConsumerResult.result(); + } else { + return action.get(); + } + } catch (RejectedExecutionException e) { + throw new CompletionException(e); + } + } } diff --git a/src/main/java/org/warp/commonutils/functional/CancellableBiConsumer.java b/src/main/java/org/warp/commonutils/functional/CancellableBiConsumer.java new file mode 100644 index 0000000..7d55970 --- /dev/null +++ b/src/main/java/org/warp/commonutils/functional/CancellableBiConsumer.java @@ -0,0 +1,13 @@ +package org.warp.commonutils.functional; + +public interface CancellableBiConsumer { //extends BiConsumer { + + /** + * @return false to cancel + */ + ConsumerResult acceptCancellable(T t, U u); + + /*default void accept(T t, U u) { + acceptCancellable(t, u); + }*/ +} diff --git a/src/main/java/org/warp/commonutils/functional/CancellableBiFunction.java b/src/main/java/org/warp/commonutils/functional/CancellableBiFunction.java new file mode 100644 index 0000000..610fca3 --- /dev/null +++ b/src/main/java/org/warp/commonutils/functional/CancellableBiFunction.java @@ -0,0 +1,16 @@ +package org.warp.commonutils.functional; + +public interface CancellableBiFunction { //extends BiFunction { + + OperationResult applyCancellable(T t, U u); + + /* default V apply(T t, U u) { + var result = applyCancellable(t, u); + if (result == OperationResult.CANCEL) { + throw new UnsupportedOperationException("Can't cancel this operation"); + } + //noinspection unchecked + return (V) result; + } + */ +} diff --git a/src/main/java/org/warp/commonutils/functional/CancellableConsumer.java b/src/main/java/org/warp/commonutils/functional/CancellableConsumer.java new file mode 100644 index 0000000..026d8b4 --- /dev/null +++ b/src/main/java/org/warp/commonutils/functional/CancellableConsumer.java @@ -0,0 +1,13 @@ +package org.warp.commonutils.functional; + +public interface CancellableConsumer { //extends Consumer { + + /** + * @return false to cancel + */ + ConsumerResult acceptCancellable(T t); + + /*default void accept(T t) { + acceptCancellable(t); + }*/ +} diff --git a/src/main/java/org/warp/commonutils/functional/CancellableFunction.java b/src/main/java/org/warp/commonutils/functional/CancellableFunction.java new file mode 100644 index 0000000..180938c --- /dev/null +++ b/src/main/java/org/warp/commonutils/functional/CancellableFunction.java @@ -0,0 +1,15 @@ +package org.warp.commonutils.functional; + +public interface CancellableFunction { //extends Function { + + OperationResult applyCancellable(T t); + + /*default U apply(T t) { + var result = applyCancellable(t); + if (result == OperationResult.CANCEL) { + throw new UnsupportedOperationException("Can't cancel this operation"); + } + //noinspection unchecked + return (U) result; + }*/ +} diff --git a/src/main/java/org/warp/commonutils/functional/CancellableTriConsumer.java b/src/main/java/org/warp/commonutils/functional/CancellableTriConsumer.java new file mode 100644 index 0000000..35ec4c0 --- /dev/null +++ b/src/main/java/org/warp/commonutils/functional/CancellableTriConsumer.java @@ -0,0 +1,13 @@ +package org.warp.commonutils.functional; + +public interface CancellableTriConsumer { //extends BiConsumer { + + /** + * @return false to cancel + */ + ConsumerResult acceptCancellable(T t, U u, V v); + + /*default void accept(T t, U u) { + acceptCancellable(t, u); + }*/ +} diff --git a/src/main/java/org/warp/commonutils/functional/CancellableTriFunction.java b/src/main/java/org/warp/commonutils/functional/CancellableTriFunction.java new file mode 100644 index 0000000..a4aca84 --- /dev/null +++ b/src/main/java/org/warp/commonutils/functional/CancellableTriFunction.java @@ -0,0 +1,16 @@ +package org.warp.commonutils.functional; + +public interface CancellableTriFunction { //extends BiFunction { + + OperationResult applyCancellable(T t, U u, V v); + + /* default V apply(T t, U u) { + var result = applyCancellable(t, u); + if (result == OperationResult.CANCEL) { + throw new UnsupportedOperationException("Can't cancel this operation"); + } + //noinspection unchecked + return (V) result; + } + */ +} diff --git a/src/main/java/org/warp/commonutils/functional/ConsumerResult.java b/src/main/java/org/warp/commonutils/functional/ConsumerResult.java new file mode 100644 index 0000000..7548798 --- /dev/null +++ b/src/main/java/org/warp/commonutils/functional/ConsumerResult.java @@ -0,0 +1,62 @@ +package org.warp.commonutils.functional; + +import java.util.StringJoiner; +import java.util.concurrent.CancellationException; + +public final class ConsumerResult { + + private final boolean cancel; + + private ConsumerResult(boolean cancel) { + this.cancel = cancel; + } + + public static ConsumerResult cancelNext() { + return new ConsumerResult(true); + } + + public static ConsumerResult result() { + return new ConsumerResult(false); + } + + public boolean isCancelled() { + return cancel; + } + + public void throwIfCancelled() { + if (cancel) { + throw new CancellationException("Operation cancelled"); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ConsumerResult that = (ConsumerResult) o; + + return cancel == that.cancel; + } + + @Override + public int hashCode() { + return (cancel ? 1 : 0); + } + + @Override + public String toString() { + return new StringJoiner(", ", ConsumerResult.class.getSimpleName() + "[", "]").add("cancel=" + cancel).toString(); + } + + public ConsumerResult or(ConsumerResult otherResult) { + if (otherResult.cancel) { + return otherResult; + } + return this; + } +} diff --git a/src/main/java/org/warp/commonutils/functional/OperationResult.java b/src/main/java/org/warp/commonutils/functional/OperationResult.java new file mode 100644 index 0000000..6387ded --- /dev/null +++ b/src/main/java/org/warp/commonutils/functional/OperationResult.java @@ -0,0 +1,75 @@ +package org.warp.commonutils.functional; + +import java.util.Objects; +import java.util.StringJoiner; + +public final class OperationResult { + + private final boolean cancel; + private final T value; + + private OperationResult(boolean cancel, T value) { + this.cancel = cancel; + this.value = value; + } + + public static OperationResult cancelNext(T value) { + return new OperationResult<>(true, value); + } + + public static OperationResult result(T value) { + return new OperationResult<>(false, value); + } + + public static OperationResult of(boolean cancel, T value) { + return new OperationResult<>(cancel, value); + } + + public boolean isCancelled() { + return cancel; + } + + public T getValue() { + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + OperationResult that = (OperationResult) o; + + if (cancel != that.cancel) { + return false; + } + return Objects.equals(value, that.value); + } + + @Override + public int hashCode() { + int result = (cancel ? 1 : 0); + result = 31 * result + (value != null ? value.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return new StringJoiner(", ", OperationResult.class.getSimpleName() + "[", "]") + .add("cancel=" + cancel) + .add("value=" + value) + .toString(); + } + + public OperationResult copyStatusWith(X newResults) { + if (cancel) { + return OperationResult.cancelNext(newResults); + } else { + return OperationResult.result(newResults); + } + } +}