277 lines
9.0 KiB
Java
277 lines
9.0 KiB
Java
package org.warp.commonutils.batch;
|
|
|
|
import java.util.concurrent.CompletionException;
|
|
import java.util.concurrent.RejectedExecutionException;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
import java.util.function.BiConsumer;
|
|
import java.util.function.Consumer;
|
|
import org.warp.commonutils.concurrency.executor.BlockingOnFullQueueExecutorServiceDecorator;
|
|
import org.warp.commonutils.concurrency.executor.BoundedExecutorService;
|
|
import org.warp.commonutils.functional.TriConsumer;
|
|
import org.warp.commonutils.type.IntWrapper;
|
|
import org.warp.commonutils.type.ShortNamedThreadFactory;
|
|
import org.warp.commonutils.type.VariableWrapper;
|
|
|
|
public class ParallelUtils {
|
|
|
|
public static <V> void parallelize(Consumer<Consumer<V>> iterator,
|
|
int maxQueueSize,
|
|
int parallelism,
|
|
int groupSize, Consumer<V> consumer) throws CompletionException {
|
|
var parallelExecutor = BoundedExecutorService.create(maxQueueSize,
|
|
parallelism,
|
|
0,
|
|
TimeUnit.MILLISECONDS,
|
|
new ShortNamedThreadFactory("ForEachParallel"),
|
|
(a, b) -> {}
|
|
);
|
|
final int CHUNK_SIZE = groupSize;
|
|
IntWrapper count = new IntWrapper(CHUNK_SIZE);
|
|
VariableWrapper<Object[]> values = new VariableWrapper<>(new Object[CHUNK_SIZE]);
|
|
AtomicReference<CompletionException> firstExceptionReference = new AtomicReference<>(null);
|
|
final Object arraysAccessLock = new Object();
|
|
iterator.accept((value) -> {
|
|
synchronized (arraysAccessLock) {
|
|
var firstException = firstExceptionReference.get();
|
|
if (firstException != null) {
|
|
throw firstException;
|
|
}
|
|
|
|
values.var[CHUNK_SIZE - count.var] = value;
|
|
count.var--;
|
|
if (count.var == 0) {
|
|
sendChunkItems(values, CHUNK_SIZE, count, consumer, parallelExecutor, firstExceptionReference);
|
|
}
|
|
}
|
|
});
|
|
parallelExecutor.shutdown();
|
|
try {
|
|
parallelExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
|
|
} catch (InterruptedException e) {
|
|
throw new RuntimeException("Parallel forEach interrupted", e);
|
|
}
|
|
synchronized (arraysAccessLock) {
|
|
if (count.var > 0) {
|
|
sendChunkItems(values, CHUNK_SIZE, count, consumer, null, firstExceptionReference);
|
|
}
|
|
}
|
|
|
|
var firstException = firstExceptionReference.get();
|
|
if (firstException != null) {
|
|
throw firstException;
|
|
}
|
|
}
|
|
|
|
private static <V> void sendChunkItems(VariableWrapper<Object[]> values,
|
|
int CHUNK_SIZE,
|
|
IntWrapper count,
|
|
Consumer<V> consumer,
|
|
BlockingOnFullQueueExecutorServiceDecorator parallelExecutor,
|
|
AtomicReference<CompletionException> firstExceptionReference) {
|
|
var itemsCount = CHUNK_SIZE - count.var;
|
|
count.var = CHUNK_SIZE;
|
|
Object[] valuesCopy = values.var;
|
|
values.var = new Object[itemsCount];
|
|
try {
|
|
Runnable action = () -> {
|
|
for (int i = 0; i < itemsCount; i++) {
|
|
try {
|
|
//noinspection unchecked
|
|
consumer.accept((V) valuesCopy[i]);
|
|
} catch (Exception ex) {
|
|
firstExceptionReference.compareAndSet(null, new CompletionException(ex));
|
|
}
|
|
}
|
|
};
|
|
if (parallelExecutor != null) {
|
|
parallelExecutor.execute(action);
|
|
} else {
|
|
action.run();
|
|
}
|
|
} catch (RejectedExecutionException e) {
|
|
throw new CompletionException(e);
|
|
}
|
|
}
|
|
|
|
public static <K, V> void parallelize(Consumer<BiConsumer<K, V>> iterator,
|
|
int maxQueueSize,
|
|
int parallelism,
|
|
int groupSize, BiConsumer<K, V> consumer) throws CompletionException {
|
|
if (parallelism <= 1) {
|
|
iterator.accept(consumer);
|
|
} else {
|
|
var parallelExecutor = BoundedExecutorService.create(maxQueueSize,
|
|
parallelism,
|
|
0,
|
|
TimeUnit.MILLISECONDS,
|
|
new ShortNamedThreadFactory("ForEachParallel"),
|
|
(a, b) -> {}
|
|
);
|
|
final int CHUNK_SIZE = groupSize;
|
|
IntWrapper count = new IntWrapper(CHUNK_SIZE);
|
|
VariableWrapper<Object[]> keys = new VariableWrapper<>(new Object[CHUNK_SIZE]);
|
|
VariableWrapper<Object[]> values = new VariableWrapper<>(new Object[CHUNK_SIZE]);
|
|
AtomicReference<CompletionException> firstExceptionReference = new AtomicReference<>(null);
|
|
final Object arraysAccessLock = new Object();
|
|
iterator.accept((key, value) -> {
|
|
synchronized (arraysAccessLock) {
|
|
var firstException = firstExceptionReference.get();
|
|
if (firstException != null) {
|
|
throw firstException;
|
|
}
|
|
|
|
keys.var[CHUNK_SIZE - count.var] = key;
|
|
values.var[CHUNK_SIZE - count.var] = value;
|
|
count.var--;
|
|
|
|
if (count.var == 0) {
|
|
sendChunkItems(keys, values, CHUNK_SIZE, count, consumer, parallelExecutor, firstExceptionReference);
|
|
}
|
|
}
|
|
});
|
|
parallelExecutor.shutdown();
|
|
try {
|
|
parallelExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
|
|
} catch (InterruptedException e) {
|
|
throw new RuntimeException("Parallel forEach interrupted", e);
|
|
}
|
|
synchronized (arraysAccessLock) {
|
|
if (count.var > 0) {
|
|
sendChunkItems(keys, values, CHUNK_SIZE, count, consumer, null, firstExceptionReference);
|
|
}
|
|
}
|
|
|
|
var firstException = firstExceptionReference.get();
|
|
if (firstException != null) {
|
|
throw firstException;
|
|
}
|
|
}
|
|
}
|
|
|
|
private static <K, V> void sendChunkItems(VariableWrapper<Object[]> keys,
|
|
VariableWrapper<Object[]> values,
|
|
final int CHUNK_SIZE,
|
|
IntWrapper count,
|
|
BiConsumer<K, V> consumer,
|
|
BlockingOnFullQueueExecutorServiceDecorator parallelExecutor,
|
|
AtomicReference<CompletionException> firstExceptionReference) {
|
|
int itemsCount = CHUNK_SIZE - count.var;
|
|
count.var = CHUNK_SIZE;
|
|
Object[] keysCopy = keys.var;
|
|
Object[] valuesCopy = values.var;
|
|
keys.var = new Object[itemsCount];
|
|
values.var = new Object[itemsCount];
|
|
try {
|
|
Runnable action = () -> {
|
|
for (int i = 0; i < itemsCount; i++) {
|
|
try {
|
|
//noinspection unchecked
|
|
consumer.accept((K) keysCopy[i], (V) valuesCopy[i]);
|
|
} catch (Exception ex) {
|
|
firstExceptionReference.compareAndSet(null, new CompletionException(ex));
|
|
break;
|
|
}
|
|
}
|
|
};
|
|
if (parallelExecutor != null) {
|
|
parallelExecutor.execute(action);
|
|
} else {
|
|
action.run();
|
|
}
|
|
} catch (RejectedExecutionException e) {
|
|
throw new CompletionException(e);
|
|
}
|
|
}
|
|
|
|
public static <K1, K2, V> void parallelize(Consumer<TriConsumer<K1, K2, V>> iterator,
|
|
int maxQueueSize,
|
|
int parallelism,
|
|
int groupSize,
|
|
TriConsumer<K1, K2, V> consumer) throws CompletionException {
|
|
var parallelExecutor = BoundedExecutorService.create(maxQueueSize,
|
|
parallelism,
|
|
0,
|
|
TimeUnit.MILLISECONDS,
|
|
new ShortNamedThreadFactory("ForEachParallel"),
|
|
(a, b) -> {}
|
|
);
|
|
final int CHUNK_SIZE = groupSize;
|
|
IntWrapper count = new IntWrapper(CHUNK_SIZE);
|
|
VariableWrapper<Object[]> keys1 = new VariableWrapper<>(new Object[CHUNK_SIZE]);
|
|
VariableWrapper<Object[]> keys2 = new VariableWrapper<>(new Object[CHUNK_SIZE]);
|
|
VariableWrapper<Object[]> values = new VariableWrapper<>(new Object[CHUNK_SIZE]);
|
|
AtomicReference<CompletionException> firstExceptionReference = new AtomicReference<>(null);
|
|
final Object arraysAccessLock = new Object();
|
|
iterator.accept((key1, key2, value) -> {
|
|
synchronized (arraysAccessLock) {
|
|
var firstException = firstExceptionReference.get();
|
|
if (firstException != null) {
|
|
throw firstException;
|
|
}
|
|
|
|
keys1.var[CHUNK_SIZE - count.var] = key1;
|
|
keys2.var[CHUNK_SIZE - count.var] = key2;
|
|
values.var[CHUNK_SIZE - count.var] = value;
|
|
count.var--;
|
|
if (count.var == 0) {
|
|
sendChunkItems(keys1, keys2, values, CHUNK_SIZE, count, consumer, parallelExecutor, firstExceptionReference);
|
|
}
|
|
}
|
|
});
|
|
parallelExecutor.shutdown();
|
|
try {
|
|
parallelExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
|
|
} catch (InterruptedException e) {
|
|
throw new RuntimeException("Parallel forEach interrupted", e);
|
|
}
|
|
synchronized (arraysAccessLock) {
|
|
if (count.var > 0) {
|
|
sendChunkItems(keys1, keys2, values, CHUNK_SIZE, count, consumer, null, firstExceptionReference);
|
|
}
|
|
}
|
|
|
|
var firstException = firstExceptionReference.get();
|
|
if (firstException != null) {
|
|
throw firstException;
|
|
}
|
|
}
|
|
|
|
private static <K1, K2, V> void sendChunkItems(VariableWrapper<Object[]> keys1,
|
|
VariableWrapper<Object[]> keys2,
|
|
VariableWrapper<Object[]> values,
|
|
int CHUNK_SIZE,
|
|
IntWrapper count,
|
|
TriConsumer<K1, K2, V> consumer,
|
|
BlockingOnFullQueueExecutorServiceDecorator parallelExecutor,
|
|
AtomicReference<CompletionException> firstExceptionReference) {
|
|
int itemsCount = CHUNK_SIZE - count.var;
|
|
count.var = CHUNK_SIZE;
|
|
Object[] keys1Copy = keys1.var;
|
|
Object[] keys2Copy = keys2.var;
|
|
Object[] valuesCopy = values.var;
|
|
keys1.var = new Object[itemsCount];
|
|
keys2.var = new Object[itemsCount];
|
|
values.var = new Object[itemsCount];
|
|
try {
|
|
Runnable action = () -> {
|
|
for (int i = 0; i < itemsCount; i++) {
|
|
try {
|
|
//noinspection unchecked
|
|
consumer.accept((K1) keys1Copy[i], (K2) keys2Copy[i], (V) valuesCopy[i]);
|
|
} catch (Exception ex) {
|
|
firstExceptionReference.compareAndSet(null, new CompletionException(ex));
|
|
}
|
|
}
|
|
};
|
|
if (parallelExecutor != null) {
|
|
parallelExecutor.execute(action);
|
|
} else {
|
|
action.run();
|
|
}
|
|
} catch (RejectedExecutionException e) {
|
|
throw new CompletionException(e);
|
|
}
|
|
}
|
|
}
|