diff --git a/src/main/java/org/warp/commonutils/concurrency/future/CompletableFutureUtils.java b/src/main/java/org/warp/commonutils/concurrency/future/CompletableFutureUtils.java index 9b6758e..cfc74a6 100644 --- a/src/main/java/org/warp/commonutils/concurrency/future/CompletableFutureUtils.java +++ b/src/main/java/org/warp/commonutils/concurrency/future/CompletableFutureUtils.java @@ -10,6 +10,7 @@ import java.util.Optional; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; import java.util.function.Function; @@ -145,6 +146,12 @@ public class CompletableFutureUtils { futures.stream().map(CompletableFuture::join).forEachOrdered(consumer); } + public static void forEachOrderedSet(CompletableFuture>> futureList, + Consumer consumer) { + var futures = futureList.join(); + futures.stream().map(CompletableFuture::join).forEachOrdered(consumer); + } + public static void forEach(CompletableFuture>> futureList, Consumer consumer) { futureList.join().parallelStream().map(CompletableFuture::join).forEach(consumer); } @@ -177,6 +184,34 @@ public class CompletableFutureUtils { return result; } + /** + * Accept values synchronously from an async sequence + */ + public static CompletableFuture acceptSequenceAsync(Collection> collection, + Function> runner, + ExecutorService executorService) { + CompletableFuture result = CompletableFuture.completedFuture(null); + for (CompletableFuture completableFuture : collection) { + result = result.thenComposeAsync(x -> completableFuture.thenComposeAsync(runner::apply, executorService), + executorService + ); + } + return result; + } + + /** + * Accept values synchronously from an async sequence + */ + public static CompletableFuture acceptSequenceAsync(Collection> collection, + Consumer runner, + ExecutorService executorService) { + CompletableFuture result = CompletableFuture.completedFuture(null); + for (CompletableFuture completableFuture : collection) { + result = result.thenComposeAsync(x -> completableFuture.thenAcceptAsync(runner, executorService), executorService); + } + return result; + } + public static CompletableFuture applySequenceAsync(T initialValue, Collection>> collection, ExecutorService executorService) { var result = CompletableFuture.completedFuture(initialValue); for (Function> item : collection) {