Add cancellable consumers

This commit is contained in:
Andrea Cavalli 2021-01-17 18:29:51 +01:00
parent 9f179ab32c
commit 97b7246ead
9 changed files with 610 additions and 0 deletions

View File

@ -4,11 +4,17 @@ import java.io.IOException;
import java.util.concurrent.CompletionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.warp.commonutils.concurrency.executor.BlockingOnFullQueueExecutorServiceDecorator;
import org.warp.commonutils.concurrency.executor.BoundedExecutorService;
import org.warp.commonutils.functional.CancellableBiConsumer;
import org.warp.commonutils.functional.CancellableConsumer;
import org.warp.commonutils.functional.CancellableTriConsumer;
import org.warp.commonutils.functional.ConsumerResult;
import org.warp.commonutils.functional.IOBiConsumer;
import org.warp.commonutils.functional.IOConsumer;
import org.warp.commonutils.functional.IOTriConsumer;
@ -130,6 +136,126 @@ public class ParallelUtils {
}
}
public static <K> ConsumerResult parallelize(Consumer<CancellableConsumer<K>> iterator,
int maxQueueSize,
int parallelism,
int groupSize,
CancellableConsumer<K> consumer) throws CompletionException {
if (parallelism <= 1) {
iterator.accept(consumer);
return ConsumerResult.result();
} else {
var parallelExecutor = BoundedExecutorService.create(maxQueueSize,
parallelism,
0,
TimeUnit.MILLISECONDS,
new ShortNamedThreadFactory("ForEachParallel"),
(a, b) -> {}
);
final int CHUNK_SIZE = groupSize;
IntWrapper count = new IntWrapper(CHUNK_SIZE);
VariableWrapper<Object[]> keys = new VariableWrapper<>(new Object[CHUNK_SIZE]);
AtomicReference<CompletionException> firstExceptionReference = new AtomicReference<>(null);
AtomicBoolean cancelled = new AtomicBoolean(false);
final Object arraysAccessLock = new Object();
iterator.accept((key) -> {
synchronized (arraysAccessLock) {
var firstException = firstExceptionReference.get();
if (firstException != null) {
throw firstException;
}
var cancelledVal = cancelled.get();
if (cancelledVal) {
return ConsumerResult.cancelNext();
}
keys.var[CHUNK_SIZE - count.var] = key;
count.var--;
if (count.var == 0) {
return sendChunkItems(keys,
CHUNK_SIZE,
count,
consumer,
parallelExecutor,
firstExceptionReference,
cancelled
);
} else {
return ConsumerResult.result();
}
}
});
parallelExecutor.shutdown();
try {
parallelExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
} catch (InterruptedException e) {
throw new RuntimeException("Parallel forEach interrupted", e);
}
synchronized (arraysAccessLock) {
if (count.var > 0) {
var sendChunkItemsResult = sendChunkItems(keys,
CHUNK_SIZE,
count,
consumer,
null,
firstExceptionReference,
cancelled
);
cancelled.compareAndSet(false, sendChunkItemsResult.isCancelled());
}
}
var firstException = firstExceptionReference.get();
if (firstException != null) {
throw firstException;
}
if (cancelled.get()) {
return ConsumerResult.cancelNext();
} else {
return ConsumerResult.result();
}
}
}
private static <K> ConsumerResult sendChunkItems(VariableWrapper<Object[]> keys,
final int CHUNK_SIZE,
IntWrapper count,
CancellableConsumer<K> consumer,
BlockingOnFullQueueExecutorServiceDecorator parallelExecutor,
AtomicReference<CompletionException> firstExceptionReference,
AtomicBoolean cancelled) {
int itemsCount = CHUNK_SIZE - count.var;
count.var = CHUNK_SIZE;
Object[] keysCopy = keys.var;
keys.var = new Object[itemsCount];
try {
Supplier<ConsumerResult> action = () -> {
for (int i = 0; i < itemsCount; i++) {
try {
//noinspection unchecked
if (consumer.acceptCancellable((K) keysCopy[i]).isCancelled()) {
cancelled.set(true);
return ConsumerResult.cancelNext();
}
} catch (Exception ex) {
firstExceptionReference.compareAndSet(null, new CompletionException(ex));
return ConsumerResult.cancelNext();
}
}
return ConsumerResult.result();
};
if (parallelExecutor != null) {
parallelExecutor.execute(action::get);
return ConsumerResult.result();
} else {
return action.get();
}
} catch (RejectedExecutionException e) {
throw new CompletionException(e);
}
}
public static <K, V> void parallelizeIO(IOConsumer<IOBiConsumer<K, V>> iterator,
int maxQueueSize,
int parallelism,
@ -252,6 +378,133 @@ public class ParallelUtils {
}
}
public static <K, V> ConsumerResult parallelize(Consumer<CancellableBiConsumer<K, V>> iterator,
int maxQueueSize,
int parallelism,
int groupSize,
CancellableBiConsumer<K, V> consumer) throws CompletionException {
if (parallelism <= 1) {
iterator.accept(consumer);
return ConsumerResult.result();
} else {
var parallelExecutor = BoundedExecutorService.create(maxQueueSize,
parallelism,
0,
TimeUnit.MILLISECONDS,
new ShortNamedThreadFactory("ForEachParallel"),
(a, b) -> {}
);
final int CHUNK_SIZE = groupSize;
IntWrapper count = new IntWrapper(CHUNK_SIZE);
VariableWrapper<Object[]> keys = new VariableWrapper<>(new Object[CHUNK_SIZE]);
VariableWrapper<Object[]> values = new VariableWrapper<>(new Object[CHUNK_SIZE]);
AtomicReference<CompletionException> firstExceptionReference = new AtomicReference<>(null);
AtomicBoolean cancelled = new AtomicBoolean(false);
final Object arraysAccessLock = new Object();
iterator.accept((key, value) -> {
synchronized (arraysAccessLock) {
var firstException = firstExceptionReference.get();
if (firstException != null) {
throw firstException;
}
var cancelledVal = cancelled.get();
if (cancelledVal) {
return ConsumerResult.cancelNext();
}
keys.var[CHUNK_SIZE - count.var] = key;
values.var[CHUNK_SIZE - count.var] = value;
count.var--;
if (count.var == 0) {
return sendChunkItems(keys,
values,
CHUNK_SIZE,
count,
consumer,
parallelExecutor,
firstExceptionReference,
cancelled
);
} else {
return ConsumerResult.result();
}
}
});
parallelExecutor.shutdown();
try {
parallelExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
} catch (InterruptedException e) {
throw new RuntimeException("Parallel forEach interrupted", e);
}
synchronized (arraysAccessLock) {
if (count.var > 0) {
var sendChunkItemsResult = sendChunkItems(keys,
values,
CHUNK_SIZE,
count,
consumer,
null,
firstExceptionReference,
cancelled
);
cancelled.compareAndSet(false, sendChunkItemsResult.isCancelled());
}
}
var firstException = firstExceptionReference.get();
if (firstException != null) {
throw firstException;
}
if (cancelled.get()) {
return ConsumerResult.cancelNext();
} else {
return ConsumerResult.result();
}
}
}
private static <K, V> ConsumerResult sendChunkItems(VariableWrapper<Object[]> keys,
VariableWrapper<Object[]> values,
final int CHUNK_SIZE,
IntWrapper count,
CancellableBiConsumer<K, V> consumer,
BlockingOnFullQueueExecutorServiceDecorator parallelExecutor,
AtomicReference<CompletionException> firstExceptionReference,
AtomicBoolean cancelled) {
int itemsCount = CHUNK_SIZE - count.var;
count.var = CHUNK_SIZE;
Object[] keysCopy = keys.var;
Object[] valuesCopy = values.var;
keys.var = new Object[itemsCount];
values.var = new Object[itemsCount];
try {
Supplier<ConsumerResult> action = () -> {
for (int i = 0; i < itemsCount; i++) {
try {
//noinspection unchecked
if (consumer.acceptCancellable((K) keysCopy[i], (V) valuesCopy[i]).isCancelled()) {
cancelled.set(true);
return ConsumerResult.cancelNext();
}
} catch (Exception ex) {
firstExceptionReference.compareAndSet(null, new CompletionException(ex));
return ConsumerResult.cancelNext();
}
}
return ConsumerResult.result();
};
if (parallelExecutor != null) {
parallelExecutor.execute(action::get);
return ConsumerResult.result();
} else {
return action.get();
}
} catch (RejectedExecutionException e) {
throw new CompletionException(e);
}
}
public static <K1, K2, V> void parallelizeIO(IOConsumer<IOTriConsumer<K1, K2, V>> iterator,
int maxQueueSize,
int parallelism,
@ -373,4 +626,138 @@ public class ParallelUtils {
throw new CompletionException(e);
}
}
public static <K1, K2, V> ConsumerResult parallelize(Consumer<CancellableTriConsumer<K1, K2, V>> iterator,
int maxQueueSize,
int parallelism,
int groupSize,
CancellableTriConsumer<K1, K2, V> consumer) throws CompletionException {
if (parallelism <= 1) {
iterator.accept(consumer);
return ConsumerResult.result();
} else {
var parallelExecutor = BoundedExecutorService.create(maxQueueSize,
parallelism,
0,
TimeUnit.MILLISECONDS,
new ShortNamedThreadFactory("ForEachParallel"),
(a, b) -> {}
);
final int CHUNK_SIZE = groupSize;
IntWrapper count = new IntWrapper(CHUNK_SIZE);
VariableWrapper<Object[]> keys1 = new VariableWrapper<>(new Object[CHUNK_SIZE]);
VariableWrapper<Object[]> keys2 = new VariableWrapper<>(new Object[CHUNK_SIZE]);
VariableWrapper<Object[]> values = new VariableWrapper<>(new Object[CHUNK_SIZE]);
AtomicReference<CompletionException> firstExceptionReference = new AtomicReference<>(null);
AtomicBoolean cancelled = new AtomicBoolean(false);
final Object arraysAccessLock = new Object();
iterator.accept((key1, key2, value) -> {
synchronized (arraysAccessLock) {
var firstException = firstExceptionReference.get();
if (firstException != null) {
throw firstException;
}
var cancelledVal = cancelled.get();
if (cancelledVal) {
return ConsumerResult.cancelNext();
}
keys1.var[CHUNK_SIZE - count.var] = key1;
keys2.var[CHUNK_SIZE - count.var] = key2;
values.var[CHUNK_SIZE - count.var] = value;
count.var--;
if (count.var == 0) {
return sendChunkItems(keys1,
keys2,
values,
CHUNK_SIZE,
count,
consumer,
parallelExecutor,
firstExceptionReference,
cancelled
);
} else {
return ConsumerResult.result();
}
}
});
parallelExecutor.shutdown();
try {
parallelExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
} catch (InterruptedException e) {
throw new RuntimeException("Parallel forEach interrupted", e);
}
synchronized (arraysAccessLock) {
if (count.var > 0) {
var sendChunkItemsResult = sendChunkItems(keys1,
keys2,
values,
CHUNK_SIZE,
count,
consumer,
null,
firstExceptionReference,
cancelled
);
cancelled.compareAndSet(false, sendChunkItemsResult.isCancelled());
}
}
var firstException = firstExceptionReference.get();
if (firstException != null) {
throw firstException;
}
if (cancelled.get()) {
return ConsumerResult.cancelNext();
} else {
return ConsumerResult.result();
}
}
}
private static <K1, K2, V> ConsumerResult sendChunkItems(VariableWrapper<Object[]> keys1,
VariableWrapper<Object[]> keys2,
VariableWrapper<Object[]> values,
final int CHUNK_SIZE,
IntWrapper count,
CancellableTriConsumer<K1, K2, V> consumer,
BlockingOnFullQueueExecutorServiceDecorator parallelExecutor,
AtomicReference<CompletionException> firstExceptionReference,
AtomicBoolean cancelled) {
int itemsCount = CHUNK_SIZE - count.var;
count.var = CHUNK_SIZE;
Object[] keys1Copy = keys1.var;
Object[] keys2Copy = keys2.var;
Object[] valuesCopy = values.var;
keys1.var = new Object[itemsCount];
keys2.var = new Object[itemsCount];
values.var = new Object[itemsCount];
try {
Supplier<ConsumerResult> action = () -> {
for (int i = 0; i < itemsCount; i++) {
try {
//noinspection unchecked
if (consumer.acceptCancellable((K1) keys1Copy[i], (K2) keys2Copy[i], (V) valuesCopy[i]).isCancelled()) {
cancelled.set(true);
return ConsumerResult.cancelNext();
}
} catch (Exception ex) {
firstExceptionReference.compareAndSet(null, new CompletionException(ex));
return ConsumerResult.cancelNext();
}
}
return ConsumerResult.result();
};
if (parallelExecutor != null) {
parallelExecutor.execute(action::get);
return ConsumerResult.result();
} else {
return action.get();
}
} catch (RejectedExecutionException e) {
throw new CompletionException(e);
}
}
}

View File

@ -0,0 +1,13 @@
package org.warp.commonutils.functional;
public interface CancellableBiConsumer<T, U> { //extends BiConsumer<T, U> {
/**
* @return false to cancel
*/
ConsumerResult acceptCancellable(T t, U u);
/*default void accept(T t, U u) {
acceptCancellable(t, u);
}*/
}

View File

@ -0,0 +1,16 @@
package org.warp.commonutils.functional;
public interface CancellableBiFunction<T, U, V> { //extends BiFunction<T, U, V> {
OperationResult<V> applyCancellable(T t, U u);
/* default V apply(T t, U u) {
var result = applyCancellable(t, u);
if (result == OperationResult.CANCEL) {
throw new UnsupportedOperationException("Can't cancel this operation");
}
//noinspection unchecked
return (V) result;
}
*/
}

View File

@ -0,0 +1,13 @@
package org.warp.commonutils.functional;
public interface CancellableConsumer<T> { //extends Consumer<T> {
/**
* @return false to cancel
*/
ConsumerResult acceptCancellable(T t);
/*default void accept(T t) {
acceptCancellable(t);
}*/
}

View File

@ -0,0 +1,15 @@
package org.warp.commonutils.functional;
public interface CancellableFunction<T, U> { //extends Function<T, U> {
OperationResult<U> applyCancellable(T t);
/*default U apply(T t) {
var result = applyCancellable(t);
if (result == OperationResult.CANCEL) {
throw new UnsupportedOperationException("Can't cancel this operation");
}
//noinspection unchecked
return (U) result;
}*/
}

View File

@ -0,0 +1,13 @@
package org.warp.commonutils.functional;
public interface CancellableTriConsumer<T, U, V> { //extends BiConsumer<T, U> {
/**
* @return false to cancel
*/
ConsumerResult acceptCancellable(T t, U u, V v);
/*default void accept(T t, U u) {
acceptCancellable(t, u);
}*/
}

View File

@ -0,0 +1,16 @@
package org.warp.commonutils.functional;
public interface CancellableTriFunction<T, U, V, W> { //extends BiFunction<T, U, V> {
OperationResult<W> applyCancellable(T t, U u, V v);
/* default V apply(T t, U u) {
var result = applyCancellable(t, u);
if (result == OperationResult.CANCEL) {
throw new UnsupportedOperationException("Can't cancel this operation");
}
//noinspection unchecked
return (V) result;
}
*/
}

View File

@ -0,0 +1,62 @@
package org.warp.commonutils.functional;
import java.util.StringJoiner;
import java.util.concurrent.CancellationException;
public final class ConsumerResult {
private final boolean cancel;
private ConsumerResult(boolean cancel) {
this.cancel = cancel;
}
public static ConsumerResult cancelNext() {
return new ConsumerResult(true);
}
public static ConsumerResult result() {
return new ConsumerResult(false);
}
public boolean isCancelled() {
return cancel;
}
public void throwIfCancelled() {
if (cancel) {
throw new CancellationException("Operation cancelled");
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ConsumerResult that = (ConsumerResult) o;
return cancel == that.cancel;
}
@Override
public int hashCode() {
return (cancel ? 1 : 0);
}
@Override
public String toString() {
return new StringJoiner(", ", ConsumerResult.class.getSimpleName() + "[", "]").add("cancel=" + cancel).toString();
}
public ConsumerResult or(ConsumerResult otherResult) {
if (otherResult.cancel) {
return otherResult;
}
return this;
}
}

View File

@ -0,0 +1,75 @@
package org.warp.commonutils.functional;
import java.util.Objects;
import java.util.StringJoiner;
public final class OperationResult<T> {
private final boolean cancel;
private final T value;
private OperationResult(boolean cancel, T value) {
this.cancel = cancel;
this.value = value;
}
public static <T> OperationResult<T> cancelNext(T value) {
return new OperationResult<>(true, value);
}
public static <T> OperationResult<T> result(T value) {
return new OperationResult<>(false, value);
}
public static <T> OperationResult<T> of(boolean cancel, T value) {
return new OperationResult<>(cancel, value);
}
public boolean isCancelled() {
return cancel;
}
public T getValue() {
return value;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
OperationResult<?> that = (OperationResult<?>) o;
if (cancel != that.cancel) {
return false;
}
return Objects.equals(value, that.value);
}
@Override
public int hashCode() {
int result = (cancel ? 1 : 0);
result = 31 * result + (value != null ? value.hashCode() : 0);
return result;
}
@Override
public String toString() {
return new StringJoiner(", ", OperationResult.class.getSimpleName() + "[", "]")
.add("cancel=" + cancel)
.add("value=" + value)
.toString();
}
public <X> OperationResult<X> copyStatusWith(X newResults) {
if (cancel) {
return OperationResult.cancelNext(newResults);
} else {
return OperationResult.result(newResults);
}
}
}