Async sequence

This commit is contained in:
Andrea Cavalli 2020-07-26 03:17:36 +02:00
parent ecd8974c56
commit fed608d6fa

View File

@ -10,6 +10,7 @@ import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
@ -145,6 +146,12 @@ public class CompletableFutureUtils {
futures.stream().map(CompletableFuture::join).forEachOrdered(consumer); futures.stream().map(CompletableFuture::join).forEachOrdered(consumer);
} }
public static <T> void forEachOrderedSet(CompletableFuture<LinkedHashSet<CompletableFuture<T>>> futureList,
Consumer<T> consumer) {
var futures = futureList.join();
futures.stream().map(CompletableFuture::join).forEachOrdered(consumer);
}
public static <T> void forEach(CompletableFuture<List<CompletableFuture<T>>> futureList, Consumer<T> consumer) { public static <T> void forEach(CompletableFuture<List<CompletableFuture<T>>> futureList, Consumer<T> consumer) {
futureList.join().parallelStream().map(CompletableFuture::join).forEach(consumer); futureList.join().parallelStream().map(CompletableFuture::join).forEach(consumer);
} }
@ -177,6 +184,34 @@ public class CompletableFutureUtils {
return result; return result;
} }
/**
* Accept values synchronously from an async sequence
*/
public static <T> CompletableFuture<?> acceptSequenceAsync(Collection<CompletableFuture<T>> collection,
Function<T, CompletionStage<?>> runner,
ExecutorService executorService) {
CompletableFuture<?> result = CompletableFuture.completedFuture(null);
for (CompletableFuture<T> completableFuture : collection) {
result = result.thenComposeAsync(x -> completableFuture.thenComposeAsync(runner::apply, executorService),
executorService
);
}
return result;
}
/**
* Accept values synchronously from an async sequence
*/
public static <T> CompletableFuture<?> acceptSequenceAsync(Collection<CompletableFuture<T>> collection,
Consumer<T> runner,
ExecutorService executorService) {
CompletableFuture<?> result = CompletableFuture.completedFuture(null);
for (CompletableFuture<T> completableFuture : collection) {
result = result.thenComposeAsync(x -> completableFuture.thenAcceptAsync(runner, executorService), executorService);
}
return result;
}
public static <T> CompletableFuture<T> applySequenceAsync(T initialValue, Collection<Function<T, CompletableFuture<T>>> collection, ExecutorService executorService) { public static <T> CompletableFuture<T> applySequenceAsync(T initialValue, Collection<Function<T, CompletableFuture<T>>> collection, ExecutorService executorService) {
var result = CompletableFuture.completedFuture(initialValue); var result = CompletableFuture.completedFuture(initialValue);
for (Function<T, CompletableFuture<T>> item : collection) { for (Function<T, CompletableFuture<T>> item : collection) {