414 lines
17 KiB
Java
414 lines
17 KiB
Java
package org.warp.commonutils.concurrency.future;
|
|
|
|
import java.util.ArrayList;
|
|
import java.util.Collection;
|
|
import java.util.Collections;
|
|
import java.util.Comparator;
|
|
import java.util.LinkedHashSet;
|
|
import java.util.List;
|
|
import java.util.Optional;
|
|
import java.util.PriorityQueue;
|
|
import java.util.Set;
|
|
import java.util.TreeSet;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.CompletionStage;
|
|
import java.util.concurrent.Executor;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.function.Consumer;
|
|
import java.util.function.Function;
|
|
import java.util.function.Supplier;
|
|
import java.util.stream.Collectors;
|
|
import org.warp.commonutils.functional.BiCompletableFunction;
|
|
import org.warp.commonutils.functional.CompletableFunction;
|
|
import org.warp.commonutils.functional.IOCompletableFunction;
|
|
import org.warp.commonutils.functional.IOSupplier;
|
|
import org.warp.commonutils.functional.TriCompletableFunction;
|
|
import org.warp.commonutils.type.FloatPriorityQueue;
|
|
import org.warp.commonutils.type.ScoredValue;
|
|
|
|
public class CompletableFutureUtils {
|
|
|
|
/**
|
|
* Safely get a CompletableFuture or a FailedFuture
|
|
*/
|
|
public static <T> CompletableFuture<T> getCompletableFuture(Supplier<CompletableFuture<T>> completableFutureSupplier) {
|
|
CompletableFuture<T> cf;
|
|
try {
|
|
cf = completableFutureSupplier.get();
|
|
} catch (Exception ex) {
|
|
cf = CompletableFuture.failedFuture(ex);
|
|
}
|
|
return cf;
|
|
}
|
|
|
|
/**
|
|
* Safely get a CompletableFuture or a FailedFuture
|
|
*/
|
|
public static <F, T> CompletableFuture<T> getCompletableFuture(CompletableFunction<F, T> completableFutureFunction, F value) {
|
|
return getCompletableFuture(() -> completableFutureFunction.apply(value));
|
|
}
|
|
|
|
/**
|
|
* Safely get a CompletableFuture or a FailedFuture
|
|
*/
|
|
public static <F, T> CompletableFuture<T> getCompletableFutureSupply(CompletableFunction<F, T> completableFutureFunction, Supplier<F> valueSupplier) {
|
|
return getCompletableFuture(() -> completableFutureFunction.apply(valueSupplier.get()));
|
|
}
|
|
|
|
/**
|
|
* Safely get a CompletableFuture or a FailedFuture
|
|
*/
|
|
public static <F1, F2, T> CompletableFuture<T> getCompletableFuture(BiCompletableFunction<F1, F2, T> completableFutureFunction, F1 value1, F2 value2) {
|
|
return getCompletableFuture(() -> completableFutureFunction.apply(value1, value2));
|
|
}
|
|
|
|
/**
|
|
* Safely get a CompletableFuture or a FailedFuture
|
|
*/
|
|
public static <F1, F2, T> CompletableFuture<T> getCompletableFutureSupply(BiCompletableFunction<F1, F2, T> completableFutureFunction, Supplier<F1> value1Supplier, Supplier<F2> value2Supplier) {
|
|
return getCompletableFuture(() -> completableFutureFunction.apply(value1Supplier.get(), value2Supplier.get()));
|
|
}
|
|
|
|
/**
|
|
* Safely get a CompletableFuture or a FailedFuture
|
|
*/
|
|
public static <F1, F2, F3, T> CompletableFuture<T> getCompletableFuture(TriCompletableFunction<F1, F2, F3, T> completableFutureFunction, F1 value1, F2 value2, F3 value3) {
|
|
return getCompletableFuture(() -> completableFutureFunction.apply(value1, value2, value3));
|
|
}
|
|
|
|
/**
|
|
* Safely get a CompletableFuture or a FailedFuture
|
|
*/
|
|
public static <F1, F2, F3, T> CompletableFuture<T> getCompletableFutureSupply(TriCompletableFunction<F1, F2, F3, T> completableFutureFunction, Supplier<F1> value1Supplier, Supplier<F2> value2Supplier, Supplier<F3> value3Supplier) {
|
|
return getCompletableFuture(() -> completableFutureFunction.apply(value1Supplier.get(), value2Supplier.get(), value3Supplier.get()));
|
|
}
|
|
|
|
////
|
|
|
|
|
|
/**
|
|
* Safely get a CompletableFuture or a FailedFuture
|
|
*/
|
|
public static <T> CompletableFuture<T> getCompletableFutureIO(IOSupplier<CompletableFuture<T>> completableFutureSupplier) {
|
|
CompletableFuture<T> cf;
|
|
try {
|
|
cf = completableFutureSupplier.get();
|
|
} catch (Exception ex) {
|
|
cf = CompletableFuture.failedFuture(ex);
|
|
}
|
|
return cf;
|
|
}
|
|
|
|
/**
|
|
* Safely get a CompletableFuture or a FailedFuture
|
|
*/
|
|
public static <F, T> CompletableFuture<T> getCompletableFutureIO(IOCompletableFunction<F, T> completableFutureFunction, F value) {
|
|
return getCompletableFutureIO(() -> completableFutureFunction.apply(value));
|
|
}
|
|
|
|
/**
|
|
* Safely get a CompletableFuture or a FailedFuture
|
|
*/
|
|
public static <F, T> CompletableFuture<T> getCompletableFutureIOSupply(IOCompletableFunction<F, T> completableFutureFunction, IOSupplier<F> valueSupplier) {
|
|
return getCompletableFutureIO(() -> completableFutureFunction.apply(valueSupplier.get()));
|
|
}
|
|
|
|
/**
|
|
* Aggregate multiple {@link CompletableFuture} lists into a single {@link CompletableFuture} list
|
|
*
|
|
* @param futureLists A collection of {@link CompletableFuture} lists.
|
|
* @param <T> List elements type
|
|
* @return {@link CompletableFuture} list
|
|
*/
|
|
public static <T> CompletableFuture<List<T>> aggregateList(Collection<CompletableFuture<List<T>>> futureLists) {
|
|
final CompletableFuture<List<T>> identityAggregatedResult = CompletableFuture.completedFuture(new ArrayList<T>());
|
|
|
|
return futureLists.parallelStream().reduce(identityAggregatedResult, (currentAggregatedResult, futureList) -> {
|
|
return currentAggregatedResult.thenApplyAsync((aggregatedList) -> {
|
|
aggregatedList.addAll(futureList.join());
|
|
return aggregatedList;
|
|
});
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Creates a new empty collection of disaggregated future results future lists
|
|
*/
|
|
public static <T> Collection<CompletableFuture<List<CompletableFuture<T>>>> createDisaggregatedResultsList() {
|
|
return new ArrayList<>(10);
|
|
}
|
|
|
|
/**
|
|
* Add a
|
|
* @param disaggregatedResults
|
|
* @param result
|
|
* @param <T>
|
|
*/
|
|
public static <T> void addDisaggregatedList(
|
|
Collection<CompletableFuture<List<CompletableFuture<T>>>> disaggregatedResults,
|
|
CompletableFuture<List<CompletableFuture<T>>> result) {
|
|
disaggregatedResults.add(result);
|
|
}
|
|
|
|
/**
|
|
* Add a result
|
|
*/
|
|
public static <T, U extends T> void addDisaggregatedListCast(
|
|
Collection<CompletableFuture<List<CompletableFuture<T>>>> disaggregatedResults,
|
|
CompletableFuture<List<CompletableFuture<U>>> result) {
|
|
addDisaggregatedListCastForced(disaggregatedResults, result);
|
|
}
|
|
|
|
public static <T, U> void addDisaggregatedListCastForced(
|
|
Collection<CompletableFuture<List<CompletableFuture<T>>>> disaggregatedResults,
|
|
CompletableFuture<List<CompletableFuture<U>>> result) {
|
|
disaggregatedResults.add(result.thenApply((originalList) -> {
|
|
List<CompletableFuture<T>> resultList = new ArrayList<>();
|
|
for (CompletableFuture<U> originalFuture : originalList) {
|
|
resultList.add(originalFuture.thenApply((originalValue) -> {
|
|
//noinspection unchecked
|
|
return (T) originalValue;
|
|
}));
|
|
}
|
|
return resultList;
|
|
}));
|
|
}
|
|
|
|
/**
|
|
* Aggregate multiple {@link CompletableFuture} lists into a single {@link CompletableFuture} list
|
|
*
|
|
* @param futureFloatPriorityQueues A collection of {@link CompletableFuture} lists.
|
|
* @param <T> List elements type
|
|
* @return {@link CompletableFuture} list
|
|
*/
|
|
public static <T> CompletableFuture<FloatPriorityQueue<T>> aggregatePq(Collection<CompletableFuture<FloatPriorityQueue<T>>> futureFloatPriorityQueues) {
|
|
final CompletableFuture<FloatPriorityQueue<T>> identityAggregatedResult = CompletableFuture.completedFuture(new FloatPriorityQueue<>());
|
|
|
|
return futureFloatPriorityQueues.stream().reduce(identityAggregatedResult, (currentAggregatedResult, futureFloatPriorityQueue) -> {
|
|
return currentAggregatedResult.thenApply((aggregatedFloatPriorityQueue) -> {
|
|
var futureFloatPriorityQueueValues = futureFloatPriorityQueue.join();
|
|
if (futureFloatPriorityQueueValues == aggregatedFloatPriorityQueue) {
|
|
return aggregatedFloatPriorityQueue;
|
|
}
|
|
futureFloatPriorityQueueValues.forEachItem(aggregatedFloatPriorityQueue::offer);
|
|
return aggregatedFloatPriorityQueue;
|
|
});
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Creates a new empty collection of disaggregated future results future lists
|
|
*/
|
|
public static <T> Collection<CompletableFuture<FloatPriorityQueue<CompletableFuture<T>>>> createDisaggregatedResultsPq() {
|
|
return FloatPriorityQueue.synchronizedPq(10);
|
|
}
|
|
|
|
/**
|
|
* Add a
|
|
* @param disaggregatedResults
|
|
* @param result
|
|
* @param <T>
|
|
*/
|
|
public static <T> void addDisaggregatedPq(
|
|
Collection<CompletableFuture<FloatPriorityQueue<CompletableFuture<T>>>> disaggregatedResults,
|
|
CompletableFuture<FloatPriorityQueue<CompletableFuture<T>>> result) {
|
|
disaggregatedResults.add(result);
|
|
}
|
|
|
|
/**
|
|
* Add a result
|
|
*/
|
|
public static <T, U extends T> void addDisaggregatedPqCast(
|
|
Collection<CompletableFuture<FloatPriorityQueue<CompletableFuture<T>>>> disaggregatedResults,
|
|
CompletableFuture<FloatPriorityQueue<CompletableFuture<U>>> result) {
|
|
addDisaggregatedPqCastForced(disaggregatedResults, result);
|
|
}
|
|
|
|
public static <T, U> void addDisaggregatedPqCastForced(
|
|
Collection<CompletableFuture<FloatPriorityQueue<CompletableFuture<T>>>> disaggregatedResults,
|
|
CompletableFuture<FloatPriorityQueue<CompletableFuture<U>>> result) {
|
|
disaggregatedResults.add(result.thenApply((originalFloatPriorityQueue) -> {
|
|
FloatPriorityQueue<CompletableFuture<T>> resultFloatPriorityQueue = new FloatPriorityQueue<>();
|
|
originalFloatPriorityQueue.forEachItem((originalFuture) -> {
|
|
resultFloatPriorityQueue.offer(ScoredValue.of(originalFuture.getScore(),
|
|
originalFuture.getValue().thenApply((originalValue) -> {
|
|
//noinspection unchecked
|
|
return (T) originalValue;
|
|
})
|
|
));
|
|
});
|
|
return resultFloatPriorityQueue;
|
|
}));
|
|
}
|
|
|
|
public static <T> Set<T> collectToSet(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList) {
|
|
return futureList.join().parallelStream().map(CompletableFuture::join).collect(Collectors.toSet());
|
|
}
|
|
|
|
public static <T> Set<T> collectToSet(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList, int limit) {
|
|
return futureList.join().parallelStream().map(CompletableFuture::join).limit(10).collect(Collectors.toSet());
|
|
}
|
|
|
|
public static <T> List<T> collectToList(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList) {
|
|
return futureList.join().stream().map(CompletableFuture::join).collect(Collectors.toList());
|
|
}
|
|
|
|
public static <T> List<T> collectToList(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList, int limit) {
|
|
return futureList.join().stream().map(CompletableFuture::join).limit(limit).collect(Collectors.toList());
|
|
}
|
|
|
|
public static <T> LinkedHashSet<T> collectToLinkedSet(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList) {
|
|
return futureList.join().stream().map(CompletableFuture::join).collect(Collectors.toCollection(LinkedHashSet::new));
|
|
}
|
|
|
|
public static <T> LinkedHashSet<T> collectToLinkedSet(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList,
|
|
int limit) {
|
|
return futureList.join().stream().map(CompletableFuture::join).limit(limit)
|
|
.collect(Collectors.toCollection(LinkedHashSet::new));
|
|
}
|
|
|
|
public static <T> FloatPriorityQueue<T> collectToPq(CompletableFuture<? extends FloatPriorityQueue<CompletableFuture<T>>> futureList) {
|
|
var internalPq = futureList.join().streamItems().map(t -> {
|
|
if (t.getValue() != null) {
|
|
return ScoredValue.of(t.getScore(), t.getValue().join());
|
|
} else {
|
|
return ScoredValue.of(t.getScore(), (T) null);
|
|
}
|
|
}).collect(Collectors.toCollection(PriorityQueue::new));
|
|
return new FloatPriorityQueue<>(internalPq);
|
|
}
|
|
|
|
public static <T> FloatPriorityQueue<T> collectToPq(CompletableFuture<? extends FloatPriorityQueue<CompletableFuture<T>>> futureList,
|
|
int limit) {
|
|
var internalPq = futureList.join().streamItems().map(t -> {
|
|
if (t.getValue() != null) {
|
|
return ScoredValue.of(t.getScore(), t.getValue().join());
|
|
} else {
|
|
return ScoredValue.of(t.getScore(), (T) null);
|
|
}
|
|
}).limit(limit).collect(Collectors.toCollection(PriorityQueue::new));
|
|
return new FloatPriorityQueue<>(internalPq);
|
|
}
|
|
|
|
public static <T> TreeSet<T> collectToTreeSet(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList) {
|
|
return futureList.join().stream().map(CompletableFuture::join).collect(Collectors.toCollection(TreeSet::new));
|
|
}
|
|
|
|
public static <T> TreeSet<T> collectToTreeSet(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList, int limit) {
|
|
return futureList.join().stream().map(CompletableFuture::join).limit(limit)
|
|
.collect(Collectors.toCollection(TreeSet::new));
|
|
}
|
|
|
|
public static <T> TreeSet<T> collectToTreeSet(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList, Comparator<T> comparator) {
|
|
return futureList.join().stream().map(CompletableFuture::join).collect(Collectors.toCollection(() -> new TreeSet<>(comparator)));
|
|
}
|
|
|
|
public static <T> TreeSet<T> collectToTreeSet(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList, Comparator<T> comparator, int limit) {
|
|
return futureList.join().stream().map(CompletableFuture::join).limit(limit)
|
|
.collect(Collectors.toCollection(() -> new TreeSet<>(comparator)));
|
|
}
|
|
|
|
public static <T> Optional<T> anyOrNull(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList) {
|
|
return futureList.join().parallelStream().map(CompletableFuture::join).findAny();
|
|
}
|
|
|
|
public static <T> Optional<T> firstOrNull(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList) {
|
|
return futureList.join().stream().map(CompletableFuture::join).findFirst();
|
|
}
|
|
|
|
public static <T> void forEachOrdered(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList,
|
|
Consumer<T> consumer) {
|
|
var futures = futureList.join();
|
|
futures.stream().map(CompletableFuture::join).forEachOrdered(consumer);
|
|
}
|
|
|
|
public static <T> void forEachOrdered(CompletableFuture<List<CompletableFuture<T>>> futureList,
|
|
Consumer<T> consumer, boolean reverse) {
|
|
var futures = futureList.join();
|
|
if (reverse) {
|
|
Collections.reverse(futures);
|
|
}
|
|
futures.stream().map(CompletableFuture::join).forEachOrdered(consumer);
|
|
}
|
|
|
|
public static <T> void forEach(CompletableFuture<? extends Collection<CompletableFuture<T>>> futureList, Consumer<T> consumer) {
|
|
futureList.join().parallelStream().map(CompletableFuture::join).forEach(consumer);
|
|
}
|
|
|
|
/**
|
|
* Use CompletableFutureUtils.getCompletableFuture(supplier);
|
|
*/
|
|
@Deprecated
|
|
public static <T> CompletableFuture<T> catchUncheckedExceptions(Supplier<CompletableFuture<T>> supplier) {
|
|
return getCompletableFuture(supplier);
|
|
}
|
|
|
|
public static CompletableFuture<Void> runSequence(Collection<CompletableFuture<?>> collection) {
|
|
if (collection.isEmpty()) {
|
|
return CompletableFuture.completedFuture(null);
|
|
} else {
|
|
var result = new CompletableFuture<Void>();
|
|
for (CompletableFuture<?> completableFuture : collection) {
|
|
result = result.thenCompose(x -> completableFuture.thenRun(() -> {}));
|
|
}
|
|
return result;
|
|
}
|
|
}
|
|
|
|
public static CompletableFuture<Void> runSequenceAsync(Collection<CompletableFuture<?>> collection, ExecutorService executorService) {
|
|
var result = CompletableFuture.<Void>completedFuture(null);
|
|
for (CompletableFuture<?> completableFuture : collection) {
|
|
result = result.thenComposeAsync(x -> completableFuture.thenRun(() -> {}), executorService);
|
|
}
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* Accept values synchronously from an async sequence
|
|
*/
|
|
public static <T> CompletableFuture<?> acceptSequenceAsync(Collection<CompletableFuture<T>> collection,
|
|
Function<T, CompletionStage<?>> runner,
|
|
ExecutorService executorService) {
|
|
CompletableFuture<?> result = CompletableFuture.completedFuture(null);
|
|
for (CompletableFuture<T> completableFuture : collection) {
|
|
result = result.thenComposeAsync(x -> completableFuture.thenComposeAsync(runner::apply, executorService),
|
|
executorService
|
|
);
|
|
}
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* Accept values synchronously from an async sequence
|
|
*/
|
|
public static <T> CompletableFuture<?> acceptSequenceAsync(Collection<CompletableFuture<T>> collection,
|
|
Consumer<T> runner,
|
|
ExecutorService executorService) {
|
|
CompletableFuture<?> result = CompletableFuture.completedFuture(null);
|
|
for (CompletableFuture<T> completableFuture : collection) {
|
|
result = result.thenComposeAsync(x -> completableFuture.thenAcceptAsync(runner, executorService), executorService);
|
|
}
|
|
return result;
|
|
}
|
|
|
|
public static <T> CompletableFuture<T> applySequenceAsync(T initialValue, Collection<Function<T, CompletableFuture<T>>> collection, ExecutorService executorService) {
|
|
var result = CompletableFuture.completedFuture(initialValue);
|
|
for (Function<T, CompletableFuture<T>> item : collection) {
|
|
result = result.thenComposeAsync(item, executorService);
|
|
}
|
|
return result;
|
|
}
|
|
|
|
public static <U> CompletableFuture<U> composeAsync(
|
|
Supplier<? extends CompletionStage<U>> supp,
|
|
Executor executor) {
|
|
return CompletableFuture.completedFuture(null).thenComposeAsync((_x) -> supp.get(), executor);
|
|
}
|
|
|
|
public static <U> CompletableFuture<U> composeAsyncIO(
|
|
IOSupplier<CompletableFuture<U>> supp,
|
|
Executor executor) {
|
|
return CompletableFuture.completedFuture(null).thenComposeAsync((_x) -> getCompletableFutureIO(supp), executor);
|
|
}
|
|
}
|