CavalliumDBEngine/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java

58 lines
1.7 KiB
Java
Raw Normal View History

2021-02-03 20:13:17 +01:00
package it.cavallium.dbengine.client;
import it.cavallium.dbengine.database.collections.Joiner.ValueGetter;
2021-03-03 15:03:25 +01:00
import lombok.EqualsAndHashCode;
import lombok.ToString;
2021-02-03 20:13:17 +01:00
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
2021-02-03 20:13:17 +01:00
2021-03-03 15:03:25 +01:00
@EqualsAndHashCode
@ToString
2021-02-03 20:13:17 +01:00
public class SearchResultKeys<T> {
2021-03-03 15:03:25 +01:00
private final Flux<LuceneSignal<SearchResultKey<T>>> results;
2021-02-03 20:13:17 +01:00
2021-03-03 15:03:25 +01:00
public SearchResultKeys(Flux<LuceneSignal<SearchResultKey<T>>> results) {
2021-02-03 20:13:17 +01:00
this.results = results;
}
public static <T, U> SearchResultKeys<T> empty() {
2021-03-03 15:03:25 +01:00
return new SearchResultKeys<>(Flux.just(LuceneSignal.totalHitsCount(0L)));
2021-02-03 20:13:17 +01:00
}
2021-03-03 15:03:25 +01:00
public Flux<LuceneSignal<SearchResultKey<T>>> results() {
2021-02-03 20:13:17 +01:00
return this.results;
}
public <U> SearchResult<T, U> withValues(ValueGetter<T, U> valuesGetter) {
2021-03-03 15:03:25 +01:00
return new SearchResult<>(
results.flatMapSequential(item -> {
if (item.isValue()) {
return valuesGetter
.get(item.getValue().getKey())
.map(value -> LuceneSignal.value(new SearchResultItem<>(item.getValue().getKey(), value, item.getValue().getScore())));
} else {
return Mono.just(item.mapTotalHitsCount());
}
})
2021-02-03 20:13:17 +01:00
);
}
2021-03-11 13:01:25 +01:00
public Flux<SearchResultKey<T>> onlyKeys() {
return this.results.filter(LuceneSignal::isValue).map(LuceneSignal::getValue);
}
/**
* You must subscribe to both publishers
*/
public Tuple2<Flux<SearchResultKey<T>>, Mono<Long>> splitShared() {
Flux<LuceneSignal<SearchResultKey<T>>> shared = results.publish().refCount(2);
return Tuples.of(
shared.filter(LuceneSignal::isValue).map(LuceneSignal::getValue).share(),
Mono.from(shared.filter(LuceneSignal::isTotalHitsCount).map(LuceneSignal::getTotalHitsCount)).cache()
);
}
2021-02-03 20:13:17 +01:00
}