common-utils/src/main/java/org/warp/commonutils/concurrency/future/CompletableFutureUtils.java

408 lines
16 KiB
Java

package org.warp.commonutils.concurrency.future;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.warp.commonutils.functional.BiCompletableFunction;
import org.warp.commonutils.functional.CompletableFunction;
import org.warp.commonutils.functional.IOCompletableFunction;
import org.warp.commonutils.functional.IOSupplier;
import org.warp.commonutils.functional.TriCompletableFunction;
import org.warp.commonutils.type.FloatPriorityQueue;
import org.warp.commonutils.type.ScoredValue;
public class CompletableFutureUtils {
/**
* Safely get a CompletableFuture or a FailedFuture
*/
public static <T> CompletableFuture<T> getCompletableFuture(Supplier<CompletableFuture<T>> completableFutureSupplier) {
CompletableFuture<T> cf;
try {
cf = completableFutureSupplier.get();
} catch (Exception ex) {
cf = CompletableFuture.failedFuture(ex);
}
return cf;
}
/**
* Safely get a CompletableFuture or a FailedFuture
*/
public static <F, T> CompletableFuture<T> getCompletableFuture(CompletableFunction<F, T> completableFutureFunction, F value) {
return getCompletableFuture(() -> completableFutureFunction.apply(value));
}
/**
* Safely get a CompletableFuture or a FailedFuture
*/
public static <F, T> CompletableFuture<T> getCompletableFutureSupply(CompletableFunction<F, T> completableFutureFunction, Supplier<F> valueSupplier) {
return getCompletableFuture(() -> completableFutureFunction.apply(valueSupplier.get()));
}
/**
* Safely get a CompletableFuture or a FailedFuture
*/
public static <F1, F2, T> CompletableFuture<T> getCompletableFuture(BiCompletableFunction<F1, F2, T> completableFutureFunction, F1 value1, F2 value2) {
return getCompletableFuture(() -> completableFutureFunction.apply(value1, value2));
}
/**
* Safely get a CompletableFuture or a FailedFuture
*/
public static <F1, F2, T> CompletableFuture<T> getCompletableFutureSupply(BiCompletableFunction<F1, F2, T> completableFutureFunction, Supplier<F1> value1Supplier, Supplier<F2> value2Supplier) {
return getCompletableFuture(() -> completableFutureFunction.apply(value1Supplier.get(), value2Supplier.get()));
}
/**
* Safely get a CompletableFuture or a FailedFuture
*/
public static <F1, F2, F3, T> CompletableFuture<T> getCompletableFuture(TriCompletableFunction<F1, F2, F3, T> completableFutureFunction, F1 value1, F2 value2, F3 value3) {
return getCompletableFuture(() -> completableFutureFunction.apply(value1, value2, value3));
}
/**
* Safely get a CompletableFuture or a FailedFuture
*/
public static <F1, F2, F3, T> CompletableFuture<T> getCompletableFutureSupply(TriCompletableFunction<F1, F2, F3, T> completableFutureFunction, Supplier<F1> value1Supplier, Supplier<F2> value2Supplier, Supplier<F3> value3Supplier) {
return getCompletableFuture(() -> completableFutureFunction.apply(value1Supplier.get(), value2Supplier.get(), value3Supplier.get()));
}
////
/**
* Safely get a CompletableFuture or a FailedFuture
*/
public static <T> CompletableFuture<T> getCompletableFutureIO(IOSupplier<CompletableFuture<T>> completableFutureSupplier) {
CompletableFuture<T> cf;
try {
cf = completableFutureSupplier.get();
} catch (Exception ex) {
cf = CompletableFuture.failedFuture(ex);
}
return cf;
}
/**
* Safely get a CompletableFuture or a FailedFuture
*/
public static <F, T> CompletableFuture<T> getCompletableFutureIO(IOCompletableFunction<F, T> completableFutureFunction, F value) {
return getCompletableFutureIO(() -> completableFutureFunction.apply(value));
}
/**
* Safely get a CompletableFuture or a FailedFuture
*/
public static <F, T> CompletableFuture<T> getCompletableFutureIOSupply(IOCompletableFunction<F, T> completableFutureFunction, IOSupplier<F> valueSupplier) {
return getCompletableFutureIO(() -> completableFutureFunction.apply(valueSupplier.get()));
}
/**
* Aggregate multiple {@link CompletableFuture} lists into a single {@link CompletableFuture} list
*
* @param futureLists A collection of {@link CompletableFuture} lists.
* @param <T> List elements type
* @return {@link CompletableFuture} list
*/
public static <T> CompletableFuture<List<T>> aggregateList(Collection<CompletableFuture<List<T>>> futureLists) {
final CompletableFuture<List<T>> identityAggregatedResult = CompletableFuture.completedFuture(new ArrayList<T>());
return futureLists.parallelStream().reduce(identityAggregatedResult, (currentAggregatedResult, futureList) -> {
return currentAggregatedResult.thenApplyAsync((aggregatedList) -> {
aggregatedList.addAll(futureList.join());
return aggregatedList;
});
});
}
/**
* Creates a new empty collection of disaggregated future results future lists
*/
public static <T> Collection<CompletableFuture<List<CompletableFuture<T>>>> createDisaggregatedResultsList() {
return new ArrayList<>(10);
}
/**
* Add a
* @param disaggregatedResults
* @param result
* @param <T>
*/
public static <T> void addDisaggregatedList(
Collection<CompletableFuture<List<CompletableFuture<T>>>> disaggregatedResults,
CompletableFuture<List<CompletableFuture<T>>> result) {
disaggregatedResults.add(result);
}
/**
* Add a result
*/
public static <T, U extends T> void addDisaggregatedListCast(
Collection<CompletableFuture<List<CompletableFuture<T>>>> disaggregatedResults,
CompletableFuture<List<CompletableFuture<U>>> result) {
addDisaggregatedListCastForced(disaggregatedResults, result);
}
public static <T, U> void addDisaggregatedListCastForced(
Collection<CompletableFuture<List<CompletableFuture<T>>>> disaggregatedResults,
CompletableFuture<List<CompletableFuture<U>>> result) {
disaggregatedResults.add(result.thenApply((originalList) -> {
List<CompletableFuture<T>> resultList = new ArrayList<>();
for (CompletableFuture<U> originalFuture : originalList) {
resultList.add(originalFuture.thenApply((originalValue) -> {
//noinspection unchecked
return (T) originalValue;
}));
}
return resultList;
}));
}
/**
* Aggregate multiple {@link CompletableFuture} lists into a single {@link CompletableFuture} list
*
* @param futureFloatPriorityQueues A collection of {@link CompletableFuture} lists.
* @param <T> List elements type
* @return {@link CompletableFuture} list
*/
public static <T> CompletableFuture<FloatPriorityQueue<T>> aggregatePq(Collection<CompletableFuture<FloatPriorityQueue<T>>> futureFloatPriorityQueues) {
final CompletableFuture<FloatPriorityQueue<T>> identityAggregatedResult = CompletableFuture.completedFuture(new FloatPriorityQueue<>());
return futureFloatPriorityQueues.parallelStream().reduce(identityAggregatedResult, (currentAggregatedResult, futureFloatPriorityQueue) -> {
return currentAggregatedResult.thenApplyAsync((aggregatedFloatPriorityQueue) -> {
var futureFloatPriorityQueueValues = futureFloatPriorityQueue.join();
if (futureFloatPriorityQueueValues == aggregatedFloatPriorityQueue) {
return aggregatedFloatPriorityQueue;
}
futureFloatPriorityQueueValues.forEachItem(aggregatedFloatPriorityQueue::offer);
return aggregatedFloatPriorityQueue;
});
});
}
/**
* Creates a new empty collection of disaggregated future results future lists
*/
public static <T> Collection<CompletableFuture<FloatPriorityQueue<CompletableFuture<T>>>> createDisaggregatedResultsPq() {
return FloatPriorityQueue.synchronizedPq(10);
}
/**
* Add a
* @param disaggregatedResults
* @param result
* @param <T>
*/
public static <T> void addDisaggregatedPq(
Collection<CompletableFuture<FloatPriorityQueue<CompletableFuture<T>>>> disaggregatedResults,
CompletableFuture<FloatPriorityQueue<CompletableFuture<T>>> result) {
disaggregatedResults.add(result);
}
/**
* Add a result
*/
public static <T, U extends T> void addDisaggregatedPqCast(
Collection<CompletableFuture<FloatPriorityQueue<CompletableFuture<T>>>> disaggregatedResults,
CompletableFuture<FloatPriorityQueue<CompletableFuture<U>>> result) {
addDisaggregatedPqCastForced(disaggregatedResults, result);
}
public static <T, U> void addDisaggregatedPqCastForced(
Collection<CompletableFuture<FloatPriorityQueue<CompletableFuture<T>>>> disaggregatedResults,
CompletableFuture<FloatPriorityQueue<CompletableFuture<U>>> result) {
disaggregatedResults.add(result.thenApply((originalFloatPriorityQueue) -> {
FloatPriorityQueue<CompletableFuture<T>> resultFloatPriorityQueue = new FloatPriorityQueue<>();
originalFloatPriorityQueue.forEachItem((originalFuture) -> {
resultFloatPriorityQueue.offer(ScoredValue.of(originalFuture.getScore(),
originalFuture.getValue().thenApply((originalValue) -> {
//noinspection unchecked
return (T) originalValue;
})
));
});
return resultFloatPriorityQueue;
}));
}
public static <T> Set<T> collectToSet(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList) {
return futureList.join().parallelStream().map(CompletableFuture::join).collect(Collectors.toSet());
}
public static <T> Set<T> collectToSet(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList, int limit) {
return futureList.join().parallelStream().map(CompletableFuture::join).limit(10).collect(Collectors.toSet());
}
public static <T> List<T> collectToList(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList) {
return futureList.join().stream().map(CompletableFuture::join).collect(Collectors.toList());
}
public static <T> List<T> collectToList(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList, int limit) {
return futureList.join().stream().map(CompletableFuture::join).limit(limit).collect(Collectors.toList());
}
public static <T> LinkedHashSet<T> collectToLinkedSet(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList) {
return futureList.join().stream().map(CompletableFuture::join).collect(Collectors.toCollection(LinkedHashSet::new));
}
public static <T> LinkedHashSet<T> collectToLinkedSet(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList,
int limit) {
return futureList.join().stream().map(CompletableFuture::join).limit(limit)
.collect(Collectors.toCollection(LinkedHashSet::new));
}
public static <T> FloatPriorityQueue<T> collectToPq(CompletableFuture<? extends FloatPriorityQueue<CompletableFuture<T>>> futureList) {
var internalPq = futureList.join().streamItems().map(t -> {
if (t.getValue() != null) {
return ScoredValue.of(t.getScore(), t.getValue().join());
} else {
return ScoredValue.of(t.getScore(), (T) null);
}
}).collect(Collectors.toCollection(PriorityQueue::new));
return new FloatPriorityQueue<>(internalPq);
}
public static <T> FloatPriorityQueue<T> collectToPq(CompletableFuture<? extends FloatPriorityQueue<CompletableFuture<T>>> futureList,
int limit) {
var internalPq = futureList.join().streamItems().map(t -> {
if (t.getValue() != null) {
return ScoredValue.of(t.getScore(), t.getValue().join());
} else {
return ScoredValue.of(t.getScore(), (T) null);
}
}).limit(limit).collect(Collectors.toCollection(PriorityQueue::new));
return new FloatPriorityQueue<>(internalPq);
}
public static <T> TreeSet<T> collectToTreeSet(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList) {
return futureList.join().stream().map(CompletableFuture::join).collect(Collectors.toCollection(TreeSet::new));
}
public static <T> TreeSet<T> collectToTreeSet(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList, int limit) {
return futureList.join().stream().map(CompletableFuture::join).limit(limit)
.collect(Collectors.toCollection(TreeSet::new));
}
public static <T> TreeSet<T> collectToTreeSet(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList, Comparator<T> comparator) {
return futureList.join().stream().map(CompletableFuture::join).collect(Collectors.toCollection(() -> new TreeSet<>(comparator)));
}
public static <T> TreeSet<T> collectToTreeSet(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList, Comparator<T> comparator, int limit) {
return futureList.join().stream().map(CompletableFuture::join).limit(limit)
.collect(Collectors.toCollection(() -> new TreeSet<>(comparator)));
}
public static <T> Optional<T> anyOrNull(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList) {
return futureList.join().parallelStream().map(CompletableFuture::join).findAny();
}
public static <T> Optional<T> firstOrNull(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList) {
return futureList.join().stream().map(CompletableFuture::join).findFirst();
}
public static <T> void forEachOrdered(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList,
Consumer<T> consumer) {
var futures = futureList.join();
futures.stream().map(CompletableFuture::join).forEachOrdered(consumer);
}
public static <T> void forEachOrdered(CompletableFuture<List<CompletableFuture<T>>> futureList,
Consumer<T> consumer, boolean reverse) {
var futures = futureList.join();
if (reverse) {
Collections.reverse(futures);
}
futures.stream().map(CompletableFuture::join).forEachOrdered(consumer);
}
public static <T> void forEach(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList, Consumer<T> consumer) {
futureList.join().parallelStream().map(CompletableFuture::join).forEach(consumer);
}
/**
* Use CompletableFutureUtils.getCompletableFuture(supplier);
*/
@Deprecated
public static <T> CompletableFuture<T> catchUncheckedExceptions(Supplier<CompletableFuture<T>> supplier) {
return getCompletableFuture(supplier);
}
public static CompletableFuture<Void> runSequence(Collection<CompletableFuture<?>> collection) {
if (collection.isEmpty()) {
return CompletableFuture.completedFuture(null);
} else {
var result = new CompletableFuture<Void>();
for (CompletableFuture<?> completableFuture : collection) {
result = result.thenCompose(x -> completableFuture.thenRun(() -> {}));
}
return result;
}
}
public static CompletableFuture<Void> runSequenceAsync(Collection<CompletableFuture<?>> collection, ExecutorService executorService) {
var result = CompletableFuture.<Void>completedFuture(null);
for (CompletableFuture<?> completableFuture : collection) {
result = result.thenComposeAsync(x -> completableFuture.thenRun(() -> {}), executorService);
}
return result;
}
/**
* Accept values synchronously from an async sequence
*/
public static <T> CompletableFuture<?> acceptSequenceAsync(Collection<CompletableFuture<T>> collection,
Function<T, CompletionStage<?>> runner,
ExecutorService executorService) {
CompletableFuture<?> result = CompletableFuture.completedFuture(null);
for (CompletableFuture<T> completableFuture : collection) {
result = result.thenComposeAsync(x -> completableFuture.thenComposeAsync(runner::apply, executorService),
executorService
);
}
return result;
}
/**
* Accept values synchronously from an async sequence
*/
public static <T> CompletableFuture<?> acceptSequenceAsync(Collection<CompletableFuture<T>> collection,
Consumer<T> runner,
ExecutorService executorService) {
CompletableFuture<?> result = CompletableFuture.completedFuture(null);
for (CompletableFuture<T> completableFuture : collection) {
result = result.thenComposeAsync(x -> completableFuture.thenAcceptAsync(runner, executorService), executorService);
}
return result;
}
public static <T> CompletableFuture<T> applySequenceAsync(T initialValue, Collection<Function<T, CompletableFuture<T>>> collection, ExecutorService executorService) {
var result = CompletableFuture.completedFuture(initialValue);
for (Function<T, CompletableFuture<T>> item : collection) {
result = result.thenComposeAsync(item, executorService);
}
return result;
}
public static <U> CompletableFuture<U> composeAsync(
Supplier<? extends CompletionStage<U>> supp,
Executor executor) {
return CompletableFuture.completedFuture(null).thenComposeAsync((_x) -> supp.get(), executor);
}
}