diff --git a/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java b/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java index 4efa9f0..7923d2f 100644 --- a/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java +++ b/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java @@ -1,5 +1,7 @@ package it.cavallium.dbengine.utils; +import static java.util.concurrent.ForkJoinPool.defaultForkJoinWorkerThreadFactory; + import com.google.common.collect.Iterators; import com.google.common.collect.Streams; import it.cavallium.dbengine.utils.PartitionByIntSpliterator.IntPartition; @@ -14,6 +16,9 @@ import java.util.Set; import java.util.Spliterator; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; +import java.util.concurrent.ForkJoinWorkerThread; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.function.BiConsumer; @@ -32,8 +37,9 @@ import org.jetbrains.annotations.Nullable; public class StreamUtils { - public static final ForkJoinPool LUCENE_SCHEDULER = new ForkJoinPool(); - public static final ForkJoinPool ROCKSDB_SCHEDULER = new ForkJoinPool(); + public static final ForkJoinPool LUCENE_SCHEDULER = newNamedForkJoinPool("Lucene"); + + public static final ForkJoinPool ROCKSDB_SCHEDULER = newNamedForkJoinPool("RocksDB"); private static final Collector TO_LIST_FAKE_COLLECTOR = new FakeCollector(); private static final Collector COUNT_FAKE_COLLECTOR = new FakeCollector(); @@ -49,6 +55,17 @@ public class StreamUtils { private static final Function FINISHER = x -> null; private static final Collector SUMMING_LONG_COLLECTOR = new SummingLongCollector(); + public static ForkJoinPool newNamedForkJoinPool(String name) { + final int MAX_CAP = 0x7fff; // max #workers - 1 + return new ForkJoinPool(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), pool -> { + var worker = defaultForkJoinWorkerThreadFactory.newThread(pool); + worker.setName("ForkJoinPool-" + name + "-worker-" + worker.getPoolIndex()); + return worker; + }, null, false, + 0, MAX_CAP, 1, null, 60_000L, TimeUnit.MILLISECONDS + ); + } + public static Collector> fastListing() { //noinspection unchecked return (Collector>) TO_LIST_FAKE_COLLECTOR;