Implement named fork join worker thread factory, count-exec collector

This commit is contained in:
Andrea Cavalli 2023-03-28 15:54:27 +02:00
parent 09113207ed
commit 8ac067b639
1 changed files with 74 additions and 12 deletions

View File

@ -21,6 +21,7 @@ import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer;
@ -43,6 +44,8 @@ public class StreamUtils {
public static final ForkJoinPool LUCENE_SCHEDULER = newNamedForkJoinPool("Lucene");
public static final ForkJoinPool GRAPH_SCHEDULER = newNamedForkJoinPool("Graph");
public static final ForkJoinPool ROCKSDB_SCHEDULER = newNamedForkJoinPool("RocksDB");
private static final Collector<?, ?, ?> TO_LIST_FAKE_COLLECTOR = new FakeCollector();
@ -63,12 +66,17 @@ public class StreamUtils {
public static ForkJoinPool newNamedForkJoinPool(String name) {
final int MAX_CAP = 0x7fff; // max #workers - 1
return new ForkJoinPool(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), pool -> {
var worker = defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("ForkJoinPool-" + name + "-worker-" + worker.getPoolIndex());
return worker;
}, null, false,
0, MAX_CAP, 1, null, 60_000L, TimeUnit.MILLISECONDS
return new ForkJoinPool(
Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
new NamedForkJoinWorkerThreadFactory(name),
null,
true,
0,
MAX_CAP,
1,
null,
60_000L,
TimeUnit.MILLISECONDS
);
}
@ -254,6 +262,14 @@ public class StreamUtils {
LongAdder sum = new LongAdder();
((Stream<Long>) stream).forEach(sum::add);
return (R) (Long) sum.sum();
} else if (collector.getClass() == CountingExecutingCollector.class) {
LongAdder sum = new LongAdder();
var consumer = ((CountingExecutingCollector<? super I>) collector).getConsumer();
stream.forEach(v -> {
sum.increment();
consumer.accept(v);
});
return (R) (Long) sum.sum();
} else if (collector.getClass() == ExecutingCollector.class) {
stream.forEach(((ExecutingCollector<? super I>) collector).getConsumer());
return null;
@ -270,6 +286,10 @@ public class StreamUtils {
return new ExecutingCollector<>(consumer);
}
public static <I> Collector<I, ?, Long> countingExecuting(Consumer<? super I> consumer) {
return new CountingExecutingCollector<>(consumer);
}
public static <I> Collector<I, ?, Void> iterating(Consumer<? super I> consumer) {
return new IteratingCollector<>(consumer);
}
@ -375,7 +395,7 @@ public class StreamUtils {
}
}
private abstract static sealed class AbstractExecutingCollector<I> implements Collector<I, Object, Void> {
private abstract static sealed class AbstractExecutingCollector<I, R> implements Collector<I, Object, R> {
private final Consumer<? super I> consumer;
@ -399,16 +419,14 @@ public class StreamUtils {
}
@Override
public Function<Object, Void> finisher() {
return FINISHER;
}
public abstract Function<Object, R> finisher();
public Consumer<? super I> getConsumer() {
return consumer;
}
}
private static final class ExecutingCollector<I> extends AbstractExecutingCollector<I> {
private static final class ExecutingCollector<I> extends AbstractExecutingCollector<I, Void> {
public ExecutingCollector(Consumer<? super I> consumer) {
super(consumer);
@ -418,14 +436,41 @@ public class StreamUtils {
public Set<Characteristics> characteristics() {
return CH_CONCURRENT_NOID;
}
@Override
public Function<Object, Void> finisher() {
return FINISHER;
}
}
private static final class IteratingCollector<I> extends AbstractExecutingCollector<I> {
private static final class CountingExecutingCollector<I> extends AbstractExecutingCollector<I, Long> {
public CountingExecutingCollector(Consumer<? super I> consumer) {
super(consumer);
}
@Override
public Function<Object, Long> finisher() {
throw new UnsupportedOperationException("This is a custom collector, do not use with the regular stream api");
}
@Override
public Set<Characteristics> characteristics() {
return CH_CONCURRENT_NOID;
}
}
private static final class IteratingCollector<I> extends AbstractExecutingCollector<I, Void> {
public IteratingCollector(Consumer<? super I> consumer) {
super(consumer);
}
@Override
public Function<Object, Void> finisher() {
return FINISHER;
}
@Override
public Set<Characteristics> characteristics() {
return CH_NOID;
@ -499,4 +544,21 @@ public class StreamUtils {
return CH_CONCURRENT_NOID;
}
}
private static class NamedForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory {
private final AtomicInteger nextWorkerId = new AtomicInteger(0);
private final String name;
public NamedForkJoinWorkerThreadFactory(String name) {
this.name = name;
}
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("ForkJoinPool-" + name + "-worker-" + nextWorkerId.getAndIncrement());
return worker;
}
}
}