Merge totalhitscount in merged streams

This commit is contained in:
Andrea Cavalli 2021-03-03 17:29:14 +01:00
parent 319abeaf30
commit a06d448182
3 changed files with 39 additions and 4 deletions

View File

@ -124,7 +124,7 @@ public class IndicizationExample {
.build(),
"id"
))
.flatMap(results -> LuceneUtils.mergeStream(results
.flatMap(results -> LuceneUtils.mergeSignalStreamRaw(results
.results(), MultiSort.topScoreRaw(), 10L)
.doOnNext(value -> System.out.println("Value: " + value))
.then(Mono.from(results

View File

@ -113,8 +113,7 @@ public class LuceneIndex<T, U> implements LLSnapshottable {
} else {
mappedSort = null;
}
Flux<LuceneSignal<SearchResultKey<T>>> sortedKeys = LuceneUtils.mergeStream(mappedKeys, mappedSort, limit);
return new SearchResultKeys<>(sortedKeys);
return new SearchResultKeys<>(LuceneUtils.mergeSignalStream(mappedKeys, mappedSort, limit));
}
private SearchResult<T, U> transformLuceneResultWithValues(LLSearchResult llSearchResult,
@ -153,7 +152,7 @@ public class LuceneIndex<T, U> implements LLSnapshottable {
} else {
mappedSort = null;
}
var sortedKeys = LuceneUtils.mergeStream(mappedKeys, mappedSort, limit);
var sortedKeys = LuceneUtils.mergeSignalStream(mappedKeys, mappedSort, limit);
return new SearchResult<>(sortedKeys);
}

View File

@ -1,7 +1,10 @@
package it.cavallium.dbengine.lucene;
import it.cavallium.dbengine.client.LuceneSignal;
import it.cavallium.dbengine.client.MultiSort;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLSignal;
import it.cavallium.dbengine.database.LLTotalHitsCount;
import it.cavallium.dbengine.lucene.analyzer.NCharGramAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.NCharGramEdgeAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
@ -34,6 +37,7 @@ import org.novasearch.lucene.search.similarities.LtcSimilarity;
import org.novasearch.lucene.search.similarities.RobertsonSimilarity;
import org.warp.commonutils.log.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class LuceneUtils {
private static final Analyzer lucene4GramWordsAnalyzerEdgeInstance = new NCharGramEdgeAnalyzer(true, 4, 4);
@ -220,4 +224,36 @@ public class LuceneUtils {
}
return HandleResult.CONTINUE;
}
public static <T> Flux<LuceneSignal<T>> mergeSignalStream(Flux<Flux<LuceneSignal<T>>> mappedKeys,
MultiSort<LuceneSignal<T>> mappedSort,
Long limit) {
Flux<Flux<LuceneSignal<T>>> sharedMappedSignals = mappedKeys.publish().refCount(2);
Flux<LuceneSignal<T>> sortedValues = LuceneUtils
.mergeStream(sharedMappedSignals.map(sub -> sub.filter(LuceneSignal::isValue)), mappedSort, limit);
//noinspection Convert2MethodRef
Mono<LuceneSignal<T>> sortedTotalSize = sharedMappedSignals
.flatMap(sub -> sub)
.filter(LuceneSignal::isTotalHitsCount)
.map(LuceneSignal::getTotalHitsCount)
.reduce(Long::sum)
.map(sum -> LuceneSignal.totalHitsCount(sum));
return sortedValues.mergeWith(sortedTotalSize);
}
public static Flux<LLSignal> mergeSignalStreamRaw(Flux<Flux<LLSignal>> mappedKeys,
MultiSort<LLSignal> mappedSort,
Long limit) {
Flux<Flux<LLSignal>> sharedMappedSignals = mappedKeys.publish().refCount(2);
Flux<LLSignal> sortedValues = LuceneUtils
.mergeStream(sharedMappedSignals.map(sub -> sub.filter(LLSignal::isValue)), mappedSort, limit);
//noinspection Convert2MethodRef
Mono<LLSignal> sortedTotalSize = sharedMappedSignals
.flatMap(sub -> sub)
.filter(LLSignal::isTotalHitsCount)
.map(LLSignal::getTotalHitsCount)
.reduce(Long::sum)
.map(sum -> new LLTotalHitsCount(sum));
return sortedValues.mergeWith(sortedTotalSize);
}
}