common-utils/src/main/java/org/warp/commonutils/batch/ParallelUtils.java

377 lines
12 KiB
Java

package org.warp.commonutils.batch;
import java.io.IOException;
import java.util.concurrent.CompletionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.warp.commonutils.concurrency.executor.BlockingOnFullQueueExecutorServiceDecorator;
import org.warp.commonutils.concurrency.executor.BoundedExecutorService;
import org.warp.commonutils.functional.IOBiConsumer;
import org.warp.commonutils.functional.IOConsumer;
import org.warp.commonutils.functional.IOTriConsumer;
import org.warp.commonutils.functional.TriConsumer;
import org.warp.commonutils.type.IntWrapper;
import org.warp.commonutils.type.ShortNamedThreadFactory;
import org.warp.commonutils.type.VariableWrapper;
public class ParallelUtils {
public static <V> void parallelizeIO(IOConsumer<IOConsumer<V>> iterator,
int maxQueueSize,
int parallelism,
int groupSize,
IOConsumer<V> consumer) throws IOException {
Consumer<Consumer<V>> action = (cons) -> {
try {
iterator.consume(cons::accept);
} catch (IOException e) {
throw new CompletionException(e);
}
};
try {
parallelize(action, maxQueueSize, parallelism, groupSize, (v) -> {
try {
consumer.consume(v);
} catch (IOException ex) {
throw new CompletionException(ex);
}
});
} catch (CompletionException ex) {
if (ex.getCause() instanceof CompletionException && ex.getCause().getCause() instanceof IOException) {
throw (IOException) ex.getCause().getCause();
} else if (ex.getCause() instanceof IOException) {
throw (IOException) ex.getCause();
} else {
throw new IOException(ex);
}
}
}
public static <V> void parallelize(Consumer<Consumer<V>> iterator,
int maxQueueSize,
int parallelism,
int groupSize, Consumer<V> consumer) throws CompletionException {
var parallelExecutor = BoundedExecutorService.create(maxQueueSize,
parallelism,
0,
TimeUnit.MILLISECONDS,
new ShortNamedThreadFactory("ForEachParallel"),
(a, b) -> {}
);
final int CHUNK_SIZE = groupSize;
IntWrapper count = new IntWrapper(CHUNK_SIZE);
VariableWrapper<Object[]> values = new VariableWrapper<>(new Object[CHUNK_SIZE]);
AtomicReference<CompletionException> firstExceptionReference = new AtomicReference<>(null);
final Object arraysAccessLock = new Object();
iterator.accept((value) -> {
synchronized (arraysAccessLock) {
var firstException = firstExceptionReference.get();
if (firstException != null) {
throw firstException;
}
values.var[CHUNK_SIZE - count.var] = value;
count.var--;
if (count.var == 0) {
sendChunkItems(values, CHUNK_SIZE, count, consumer, parallelExecutor, firstExceptionReference);
}
}
});
parallelExecutor.shutdown();
try {
parallelExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
} catch (InterruptedException e) {
throw new RuntimeException("Parallel forEach interrupted", e);
}
synchronized (arraysAccessLock) {
if (count.var > 0) {
sendChunkItems(values, CHUNK_SIZE, count, consumer, null, firstExceptionReference);
}
}
var firstException = firstExceptionReference.get();
if (firstException != null) {
throw firstException;
}
}
private static <V> void sendChunkItems(VariableWrapper<Object[]> values,
int CHUNK_SIZE,
IntWrapper count,
Consumer<V> consumer,
BlockingOnFullQueueExecutorServiceDecorator parallelExecutor,
AtomicReference<CompletionException> firstExceptionReference) {
var itemsCount = CHUNK_SIZE - count.var;
count.var = CHUNK_SIZE;
Object[] valuesCopy = values.var;
values.var = new Object[itemsCount];
try {
Runnable action = () -> {
for (int i = 0; i < itemsCount; i++) {
try {
//noinspection unchecked
consumer.accept((V) valuesCopy[i]);
} catch (Exception ex) {
firstExceptionReference.compareAndSet(null, new CompletionException(ex));
}
}
};
if (parallelExecutor != null) {
parallelExecutor.execute(action);
} else {
action.run();
}
} catch (RejectedExecutionException e) {
throw new CompletionException(e);
}
}
public static <K, V> void parallelizeIO(IOConsumer<IOBiConsumer<K, V>> iterator,
int maxQueueSize,
int parallelism,
int groupSize,
IOBiConsumer<K, V> consumer) throws IOException {
Consumer<BiConsumer<K, V>> action = (cons) -> {
try {
iterator.consume(cons::accept);
} catch (IOException e) {
throw new CompletionException(e);
}
};
try {
parallelize(action, maxQueueSize, parallelism, groupSize, (k, v) -> {
try {
consumer.consume(k, v);
} catch (IOException ex) {
throw new CompletionException(ex);
}
});
} catch (CompletionException ex) {
if (ex.getCause() instanceof CompletionException && ex.getCause().getCause() instanceof IOException) {
throw (IOException) ex.getCause().getCause();
} else if (ex.getCause() instanceof IOException) {
throw (IOException) ex.getCause();
} else {
throw new IOException(ex);
}
}
}
public static <K, V> void parallelize(Consumer<BiConsumer<K, V>> iterator,
int maxQueueSize,
int parallelism,
int groupSize, BiConsumer<K, V> consumer) throws CompletionException {
if (parallelism <= 1) {
iterator.accept(consumer);
} else {
var parallelExecutor = BoundedExecutorService.create(maxQueueSize,
parallelism,
0,
TimeUnit.MILLISECONDS,
new ShortNamedThreadFactory("ForEachParallel"),
(a, b) -> {}
);
final int CHUNK_SIZE = groupSize;
IntWrapper count = new IntWrapper(CHUNK_SIZE);
VariableWrapper<Object[]> keys = new VariableWrapper<>(new Object[CHUNK_SIZE]);
VariableWrapper<Object[]> values = new VariableWrapper<>(new Object[CHUNK_SIZE]);
AtomicReference<CompletionException> firstExceptionReference = new AtomicReference<>(null);
final Object arraysAccessLock = new Object();
iterator.accept((key, value) -> {
synchronized (arraysAccessLock) {
var firstException = firstExceptionReference.get();
if (firstException != null) {
throw firstException;
}
keys.var[CHUNK_SIZE - count.var] = key;
values.var[CHUNK_SIZE - count.var] = value;
count.var--;
if (count.var == 0) {
sendChunkItems(keys, values, CHUNK_SIZE, count, consumer, parallelExecutor, firstExceptionReference);
}
}
});
parallelExecutor.shutdown();
try {
parallelExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
} catch (InterruptedException e) {
throw new RuntimeException("Parallel forEach interrupted", e);
}
synchronized (arraysAccessLock) {
if (count.var > 0) {
sendChunkItems(keys, values, CHUNK_SIZE, count, consumer, null, firstExceptionReference);
}
}
var firstException = firstExceptionReference.get();
if (firstException != null) {
throw firstException;
}
}
}
private static <K, V> void sendChunkItems(VariableWrapper<Object[]> keys,
VariableWrapper<Object[]> values,
final int CHUNK_SIZE,
IntWrapper count,
BiConsumer<K, V> consumer,
BlockingOnFullQueueExecutorServiceDecorator parallelExecutor,
AtomicReference<CompletionException> firstExceptionReference) {
int itemsCount = CHUNK_SIZE - count.var;
count.var = CHUNK_SIZE;
Object[] keysCopy = keys.var;
Object[] valuesCopy = values.var;
keys.var = new Object[itemsCount];
values.var = new Object[itemsCount];
try {
Runnable action = () -> {
for (int i = 0; i < itemsCount; i++) {
try {
//noinspection unchecked
consumer.accept((K) keysCopy[i], (V) valuesCopy[i]);
} catch (Exception ex) {
firstExceptionReference.compareAndSet(null, new CompletionException(ex));
break;
}
}
};
if (parallelExecutor != null) {
parallelExecutor.execute(action);
} else {
action.run();
}
} catch (RejectedExecutionException e) {
throw new CompletionException(e);
}
}
public static <K1, K2, V> void parallelizeIO(IOConsumer<IOTriConsumer<K1, K2, V>> iterator,
int maxQueueSize,
int parallelism,
int groupSize,
IOTriConsumer<K1, K2, V> consumer) throws IOException {
Consumer<TriConsumer<K1, K2, V>> action = (cons) -> {
try {
iterator.consume(cons::accept);
} catch (IOException e) {
throw new CompletionException(e);
}
};
try {
parallelize(action, maxQueueSize, parallelism, groupSize, (k1, k2, v) -> {
try {
consumer.accept(k1, k2, v);
} catch (IOException ex) {
throw new CompletionException(ex);
}
});
} catch (CompletionException ex) {
if (ex.getCause() instanceof CompletionException && ex.getCause().getCause() instanceof IOException) {
throw (IOException) ex.getCause().getCause();
} else if (ex.getCause() instanceof IOException) {
throw (IOException) ex.getCause();
} else {
throw new IOException(ex);
}
}
}
public static <K1, K2, V> void parallelize(Consumer<TriConsumer<K1, K2, V>> iterator,
int maxQueueSize,
int parallelism,
int groupSize,
TriConsumer<K1, K2, V> consumer) throws CompletionException {
var parallelExecutor = BoundedExecutorService.create(maxQueueSize,
parallelism,
0,
TimeUnit.MILLISECONDS,
new ShortNamedThreadFactory("ForEachParallel"),
(a, b) -> {}
);
final int CHUNK_SIZE = groupSize;
IntWrapper count = new IntWrapper(CHUNK_SIZE);
VariableWrapper<Object[]> keys1 = new VariableWrapper<>(new Object[CHUNK_SIZE]);
VariableWrapper<Object[]> keys2 = new VariableWrapper<>(new Object[CHUNK_SIZE]);
VariableWrapper<Object[]> values = new VariableWrapper<>(new Object[CHUNK_SIZE]);
AtomicReference<CompletionException> firstExceptionReference = new AtomicReference<>(null);
final Object arraysAccessLock = new Object();
iterator.accept((key1, key2, value) -> {
synchronized (arraysAccessLock) {
var firstException = firstExceptionReference.get();
if (firstException != null) {
throw firstException;
}
keys1.var[CHUNK_SIZE - count.var] = key1;
keys2.var[CHUNK_SIZE - count.var] = key2;
values.var[CHUNK_SIZE - count.var] = value;
count.var--;
if (count.var == 0) {
sendChunkItems(keys1, keys2, values, CHUNK_SIZE, count, consumer, parallelExecutor, firstExceptionReference);
}
}
});
parallelExecutor.shutdown();
try {
parallelExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
} catch (InterruptedException e) {
throw new RuntimeException("Parallel forEach interrupted", e);
}
synchronized (arraysAccessLock) {
if (count.var > 0) {
sendChunkItems(keys1, keys2, values, CHUNK_SIZE, count, consumer, null, firstExceptionReference);
}
}
var firstException = firstExceptionReference.get();
if (firstException != null) {
throw firstException;
}
}
private static <K1, K2, V> void sendChunkItems(VariableWrapper<Object[]> keys1,
VariableWrapper<Object[]> keys2,
VariableWrapper<Object[]> values,
int CHUNK_SIZE,
IntWrapper count,
TriConsumer<K1, K2, V> consumer,
BlockingOnFullQueueExecutorServiceDecorator parallelExecutor,
AtomicReference<CompletionException> firstExceptionReference) {
int itemsCount = CHUNK_SIZE - count.var;
count.var = CHUNK_SIZE;
Object[] keys1Copy = keys1.var;
Object[] keys2Copy = keys2.var;
Object[] valuesCopy = values.var;
keys1.var = new Object[itemsCount];
keys2.var = new Object[itemsCount];
values.var = new Object[itemsCount];
try {
Runnable action = () -> {
for (int i = 0; i < itemsCount; i++) {
try {
//noinspection unchecked
consumer.accept((K1) keys1Copy[i], (K2) keys2Copy[i], (V) valuesCopy[i]);
} catch (Exception ex) {
firstExceptionReference.compareAndSet(null, new CompletionException(ex));
}
}
};
if (parallelExecutor != null) {
parallelExecutor.execute(action);
} else {
action.run();
}
} catch (RejectedExecutionException e) {
throw new CompletionException(e);
}
}
}