Avoid multiple subscriptions to the same search query

This commit is contained in:
Andrea Cavalli 2021-03-05 16:17:37 +01:00
parent 2a8bec00d4
commit 08434d475c
4 changed files with 46 additions and 17 deletions

View File

@ -25,6 +25,9 @@ public class SearchResult<T, U> {
return this.results;
}
/**
* You must subscribe to both publishers
*/
public Tuple2<Flux<SearchResultItem<T, U>>, Mono<Long>> splitShared() {
Flux<LuceneSignal<SearchResultItem<T, U>>> shared = results.publish().refCount(2);
return Tuples.of(

View File

@ -5,6 +5,8 @@ import lombok.EqualsAndHashCode;
import lombok.ToString;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
@EqualsAndHashCode
@ToString
@ -37,4 +39,15 @@ public class SearchResultKeys<T> {
})
);
}
/**
* You must subscribe to both publishers
*/
public Tuple2<Flux<SearchResultKey<T>>, Mono<Long>> splitShared() {
Flux<LuceneSignal<SearchResultKey<T>>> shared = results.publish().refCount(2);
return Tuples.of(
shared.filter(LuceneSignal::isValue).map(LuceneSignal::getValue).share(),
Mono.from(shared.filter(LuceneSignal::isTotalHitsCount).map(LuceneSignal::getTotalHitsCount)).cache()
);
}
}

View File

@ -537,7 +537,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
return Mono.just(signal);
}
})
.dematerialize()
.<LLSearchResult>dematerialize()
);
}

View File

@ -176,21 +176,22 @@ public class LuceneUtils {
@Nullable Long limit) {
if (limit != null && limit == 0) {
return mappedMultiResults.flatMap(f -> f).ignoreElements().flux();
} else {
return mappedMultiResults.collectList().flatMapMany(mappedMultiResultsList -> {
Flux<T> mergedFlux;
if (sort == null) {
mergedFlux = Flux.merge(mappedMultiResultsList);
} else {
//noinspection unchecked
mergedFlux = Flux.mergeOrdered(32, sort.getResultSort(), mappedMultiResultsList.toArray(Flux[]::new));
}
if (limit == null || limit == Long.MAX_VALUE) {
return mergedFlux;
} else {
return mergedFlux.limitRequest(limit);
}
});
}
return mappedMultiResults.collectList().flatMapMany(mappedMultiResultsList -> {
Flux<T> mergedFlux;
if (sort == null) {
mergedFlux = Flux.merge(mappedMultiResultsList);
} else {
//noinspection unchecked
mergedFlux = Flux.mergeOrdered(32, sort.getResultSort(), mappedMultiResultsList.toArray(Flux[]::new));
}
if (limit == null || limit == Long.MAX_VALUE) {
return mergedFlux;
} else {
return mergedFlux.limitRequest(limit);
}
});
}
public static HandleResult collectTopDoc(Logger logger,
@ -228,7 +229,13 @@ public class LuceneUtils {
public static <T> Flux<LuceneSignal<T>> mergeSignalStream(Flux<Flux<LuceneSignal<T>>> mappedKeys,
MultiSort<LuceneSignal<T>> mappedSort,
Long limit) {
Flux<Flux<LuceneSignal<T>>> sharedMappedSignals = mappedKeys.publish().refCount(2);
Flux<Flux<LuceneSignal<T>>> sharedMappedSignals = mappedKeys
.map(sub -> sub
.publish()
.refCount(2)
)
.publish()
.refCount(2);
Flux<LuceneSignal<T>> sortedValues = LuceneUtils
.mergeStream(sharedMappedSignals.map(sub -> sub.filter(LuceneSignal::isValue)), mappedSort, limit);
//noinspection Convert2MethodRef
@ -244,7 +251,13 @@ public class LuceneUtils {
public static Flux<LLSignal> mergeSignalStreamRaw(Flux<Flux<LLSignal>> mappedKeys,
MultiSort<LLSignal> mappedSort,
Long limit) {
Flux<Flux<LLSignal>> sharedMappedSignals = mappedKeys.publish().refCount(2);
Flux<Flux<LLSignal>> sharedMappedSignals = mappedKeys
.map(sub -> sub
.publish()
.refCount(2)
)
.publish()
.refCount(2);
Flux<LLSignal> sortedValues = LuceneUtils
.mergeStream(sharedMappedSignals.map(sub -> sub.filter(LLSignal::isValue)), mappedSort, limit);