CavalliumDBEngine/src/main/java/it/cavallium/dbengine/client/CountedStream.java

48 lines
1.2 KiB
Java
Raw Normal View History

2021-04-01 23:56:34 +02:00
package it.cavallium.dbengine.client;
import java.util.Collection;
import java.util.List;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class CountedStream<T> {
private final Flux<T> stream;
private final long count;
public CountedStream(Flux<T> stream, long count) {
this.stream = stream;
this.count = count;
}
public Flux<T> getStream() {
return stream;
}
public long getCount() {
return count;
}
@SafeVarargs
public static <T> CountedStream<T> merge(CountedStream<T>... stream) {
return merge(List.of(stream));
}
public static <T> CountedStream<T> merge(Collection<CountedStream<T>> stream) {
return stream
.stream()
2021-04-03 19:09:06 +02:00
.reduce((a, b) -> new CountedStream<>(Flux.merge(a.getStream(), b.getStream()), a.getCount() + b.getCount()))
2021-04-01 23:56:34 +02:00
.orElseGet(() -> new CountedStream<>(Flux.empty(), 0));
}
public static <T> Mono<CountedStream<T>> merge(Flux<CountedStream<T>> stream) {
return stream
2021-04-03 19:09:06 +02:00
.reduce((a, b) -> new CountedStream<>(Flux.merge(a.getStream(), b.getStream()), a.getCount() + b.getCount()))
2021-04-01 23:56:34 +02:00
.switchIfEmpty(Mono.fromSupplier(() -> new CountedStream<>(Flux.empty(), 0)));
}
public Mono<List<T>> collectList() {
return stream.collectList();
}
}