diff --git a/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java b/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java index a03538a..987ec9f 100644 --- a/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java +++ b/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java @@ -21,6 +21,7 @@ import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; import java.util.concurrent.ForkJoinWorkerThread; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.function.BiConsumer; @@ -43,6 +44,8 @@ public class StreamUtils { public static final ForkJoinPool LUCENE_SCHEDULER = newNamedForkJoinPool("Lucene"); + public static final ForkJoinPool GRAPH_SCHEDULER = newNamedForkJoinPool("Graph"); + public static final ForkJoinPool ROCKSDB_SCHEDULER = newNamedForkJoinPool("RocksDB"); private static final Collector TO_LIST_FAKE_COLLECTOR = new FakeCollector(); @@ -63,12 +66,17 @@ public class StreamUtils { public static ForkJoinPool newNamedForkJoinPool(String name) { final int MAX_CAP = 0x7fff; // max #workers - 1 - return new ForkJoinPool(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), pool -> { - var worker = defaultForkJoinWorkerThreadFactory.newThread(pool); - worker.setName("ForkJoinPool-" + name + "-worker-" + worker.getPoolIndex()); - return worker; - }, null, false, - 0, MAX_CAP, 1, null, 60_000L, TimeUnit.MILLISECONDS + return new ForkJoinPool( + Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), + new NamedForkJoinWorkerThreadFactory(name), + null, + true, + 0, + MAX_CAP, + 1, + null, + 60_000L, + TimeUnit.MILLISECONDS ); } @@ -254,6 +262,14 @@ public class StreamUtils { LongAdder sum = new LongAdder(); ((Stream) stream).forEach(sum::add); return (R) (Long) sum.sum(); + } else if (collector.getClass() == CountingExecutingCollector.class) { + LongAdder sum = new LongAdder(); + var consumer = ((CountingExecutingCollector) collector).getConsumer(); + stream.forEach(v -> { + sum.increment(); + consumer.accept(v); + }); + return (R) (Long) sum.sum(); } else if (collector.getClass() == ExecutingCollector.class) { stream.forEach(((ExecutingCollector) collector).getConsumer()); return null; @@ -270,6 +286,10 @@ public class StreamUtils { return new ExecutingCollector<>(consumer); } + public static Collector countingExecuting(Consumer consumer) { + return new CountingExecutingCollector<>(consumer); + } + public static Collector iterating(Consumer consumer) { return new IteratingCollector<>(consumer); } @@ -375,7 +395,7 @@ public class StreamUtils { } } - private abstract static sealed class AbstractExecutingCollector implements Collector { + private abstract static sealed class AbstractExecutingCollector implements Collector { private final Consumer consumer; @@ -399,16 +419,14 @@ public class StreamUtils { } @Override - public Function finisher() { - return FINISHER; - } + public abstract Function finisher(); public Consumer getConsumer() { return consumer; } } - private static final class ExecutingCollector extends AbstractExecutingCollector { + private static final class ExecutingCollector extends AbstractExecutingCollector { public ExecutingCollector(Consumer consumer) { super(consumer); @@ -418,14 +436,41 @@ public class StreamUtils { public Set characteristics() { return CH_CONCURRENT_NOID; } + + @Override + public Function finisher() { + return FINISHER; + } } - private static final class IteratingCollector extends AbstractExecutingCollector { + private static final class CountingExecutingCollector extends AbstractExecutingCollector { + + public CountingExecutingCollector(Consumer consumer) { + super(consumer); + } + + @Override + public Function finisher() { + throw new UnsupportedOperationException("This is a custom collector, do not use with the regular stream api"); + } + + @Override + public Set characteristics() { + return CH_CONCURRENT_NOID; + } + } + + private static final class IteratingCollector extends AbstractExecutingCollector { public IteratingCollector(Consumer consumer) { super(consumer); } + @Override + public Function finisher() { + return FINISHER; + } + @Override public Set characteristics() { return CH_NOID; @@ -499,4 +544,21 @@ public class StreamUtils { return CH_CONCURRENT_NOID; } } + + private static class NamedForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { + + private final AtomicInteger nextWorkerId = new AtomicInteger(0); + private final String name; + + public NamedForkJoinWorkerThreadFactory(String name) { + this.name = name; + } + + @Override + public ForkJoinWorkerThread newThread(ForkJoinPool pool) { + final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool); + worker.setName("ForkJoinPool-" + name + "-worker-" + nextWorkerId.getAndIncrement()); + return worker; + } + } }