From c64a272bd1aa56d1df82f1d872a504236c2bf558 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 17 Nov 2020 13:10:05 +0100 Subject: [PATCH] Update ParallelUtils.java and IOBiConsumer.java --- .../warp/commonutils/batch/ParallelUtils.java | 100 ++++++++++++++++++ .../commonutils/functional/IOBiConsumer.java | 8 ++ 2 files changed, 108 insertions(+) create mode 100644 src/main/java/org/warp/commonutils/functional/IOBiConsumer.java diff --git a/src/main/java/org/warp/commonutils/batch/ParallelUtils.java b/src/main/java/org/warp/commonutils/batch/ParallelUtils.java index 388b539..d267c2f 100644 --- a/src/main/java/org/warp/commonutils/batch/ParallelUtils.java +++ b/src/main/java/org/warp/commonutils/batch/ParallelUtils.java @@ -1,5 +1,6 @@ package org.warp.commonutils.batch; +import java.io.IOException; import java.util.concurrent.CompletionException; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; @@ -8,6 +9,9 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import org.warp.commonutils.concurrency.executor.BlockingOnFullQueueExecutorServiceDecorator; import org.warp.commonutils.concurrency.executor.BoundedExecutorService; +import org.warp.commonutils.functional.IOBiConsumer; +import org.warp.commonutils.functional.IOConsumer; +import org.warp.commonutils.functional.IOTriConsumer; import org.warp.commonutils.functional.TriConsumer; import org.warp.commonutils.type.IntWrapper; import org.warp.commonutils.type.ShortNamedThreadFactory; @@ -15,6 +19,38 @@ import org.warp.commonutils.type.VariableWrapper; public class ParallelUtils { + public static void parallelizeIO(IOConsumer> iterator, + int maxQueueSize, + int parallelism, + int groupSize, + IOConsumer consumer) throws IOException { + Consumer> action = (cons) -> { + try { + iterator.consume(cons::accept); + } catch (IOException e) { + throw new CompletionException(e); + } + }; + + try { + parallelize(action, maxQueueSize, parallelism, groupSize, (v) -> { + try { + consumer.consume(v); + } catch (IOException ex) { + throw new CompletionException(ex); + } + }); + } catch (CompletionException ex) { + if (ex.getCause() instanceof CompletionException && ex.getCause().getCause() instanceof IOException) { + throw (IOException) ex.getCause().getCause(); + } else if (ex.getCause() instanceof IOException) { + throw (IOException) ex.getCause(); + } else { + throw new IOException(ex); + } + } + } + public static void parallelize(Consumer> iterator, int maxQueueSize, int parallelism, @@ -94,6 +130,38 @@ public class ParallelUtils { } } + public static void parallelizeIO(IOConsumer> iterator, + int maxQueueSize, + int parallelism, + int groupSize, + IOBiConsumer consumer) throws IOException { + Consumer> action = (cons) -> { + try { + iterator.consume(cons::accept); + } catch (IOException e) { + throw new CompletionException(e); + } + }; + + try { + parallelize(action, maxQueueSize, parallelism, groupSize, (k, v) -> { + try { + consumer.consume(k, v); + } catch (IOException ex) { + throw new CompletionException(ex); + } + }); + } catch (CompletionException ex) { + if (ex.getCause() instanceof CompletionException && ex.getCause().getCause() instanceof IOException) { + throw (IOException) ex.getCause().getCause(); + } else if (ex.getCause() instanceof IOException) { + throw (IOException) ex.getCause(); + } else { + throw new IOException(ex); + } + } + } + public static void parallelize(Consumer> iterator, int maxQueueSize, int parallelism, @@ -184,6 +252,38 @@ public class ParallelUtils { } } + public static void parallelizeIO(IOConsumer> iterator, + int maxQueueSize, + int parallelism, + int groupSize, + IOTriConsumer consumer) throws IOException { + Consumer> action = (cons) -> { + try { + iterator.consume(cons::accept); + } catch (IOException e) { + throw new CompletionException(e); + } + }; + + try { + parallelize(action, maxQueueSize, parallelism, groupSize, (k1, k2, v) -> { + try { + consumer.accept(k1, k2, v); + } catch (IOException ex) { + throw new CompletionException(ex); + } + }); + } catch (CompletionException ex) { + if (ex.getCause() instanceof CompletionException && ex.getCause().getCause() instanceof IOException) { + throw (IOException) ex.getCause().getCause(); + } else if (ex.getCause() instanceof IOException) { + throw (IOException) ex.getCause(); + } else { + throw new IOException(ex); + } + } + } + public static void parallelize(Consumer> iterator, int maxQueueSize, int parallelism, diff --git a/src/main/java/org/warp/commonutils/functional/IOBiConsumer.java b/src/main/java/org/warp/commonutils/functional/IOBiConsumer.java new file mode 100644 index 0000000..096b2d5 --- /dev/null +++ b/src/main/java/org/warp/commonutils/functional/IOBiConsumer.java @@ -0,0 +1,8 @@ +package org.warp.commonutils.functional; + +import java.io.IOException; + +public interface IOBiConsumer { + + void consume(T t, U u) throws IOException; +}