Adaptive reactive lucene search engine, lazy results

This commit is contained in:
Andrea Cavalli 2021-07-05 12:05:45 +02:00
parent 7929f0dc8c
commit a5d4584a11
18 changed files with 393 additions and 569 deletions

View File

@ -10,6 +10,7 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
@SuppressWarnings("unused")
public interface LuceneIndex<T, U> extends LLSnapshottable {
@ -42,16 +43,16 @@ public interface LuceneIndex<T, U> extends LLSnapshottable {
Mono<Void> deleteAll();
Mono<SearchResultKeys<T>> moreLikeThis(ClientQueryParams<SearchResultKey<T>> queryParams, T key, U mltDocumentValue);
<V> Mono<SearchResultKeys<T>> moreLikeThis(ClientQueryParams<SearchResultKey<T>, V> queryParams, T key, U mltDocumentValue);
Mono<SearchResult<T, U>> moreLikeThisWithValues(ClientQueryParams<SearchResultItem<T, U>> queryParams,
<V> Mono<SearchResult<T, U>> moreLikeThisWithValues(ClientQueryParams<SearchResultItem<T, U>, V> queryParams,
T key,
U mltDocumentValue,
ValueGetter<T, U> valueGetter);
Mono<SearchResultKeys<T>> search(ClientQueryParams<SearchResultKey<T>> queryParams);
<V> Mono<SearchResultKeys<T>> search(ClientQueryParams<SearchResultKey<T>, V> queryParams);
Mono<SearchResult<T, U>> searchWithValues(ClientQueryParams<SearchResultItem<T, U>> queryParams,
<V> Mono<SearchResult<T, U>> searchWithValues(ClientQueryParams<SearchResultItem<T, U>, V> queryParams,
ValueGetter<T, U> valueGetter);
Mono<Long> count(@Nullable CompositeSnapshot snapshot, Query query);

View File

@ -1,7 +1,6 @@
package it.cavallium.dbengine.client;
import it.cavallium.dbengine.client.query.ClientQueryParams;
import it.cavallium.dbengine.client.query.ClientQueryParamsBuilder;
import it.cavallium.dbengine.client.query.current.data.Query;
import it.cavallium.dbengine.client.query.current.data.QueryParams;
import it.cavallium.dbengine.database.LLLuceneIndex;
@ -101,7 +100,7 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
}
private Mono<SearchResultKeys<T>> transformLuceneResult(LLSearchResult llSearchResult,
@Nullable MultiSort<SearchResultKey<T>> sort,
@Nullable MultiSort<SearchResultKey<T>, ?> sort,
LLScoreMode scoreMode,
long offset,
@Nullable Long limit) {
@ -109,59 +108,39 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
.results()
.map(flux -> new SearchResultKeys<>(flux
.results()
.map(signal -> new SearchResultKey<>(indicizer.getKey(signal.getKey()), signal.getScore())),
.map(signal -> new SearchResultKey<>(signal.key().map(indicizer::getKey), signal.score())),
flux.totalHitsCount()
));
MultiSort<SearchResultKey<T>> finalSort;
MultiSort<SearchResultKey<T>, ?> finalSort;
if (scoreMode != LLScoreMode.COMPLETE_NO_SCORES && sort == null) {
finalSort = MultiSort.topScore();
} else {
finalSort = sort;
}
MultiSort<SearchResultKey<T>> mappedSort;
if (finalSort != null) {
mappedSort = new MultiSort<>(
finalSort.getQuerySort(),
(signal1, signal2) -> finalSort.getResultSort().compare((signal1), signal2)
);
} else {
mappedSort = null;
}
return LuceneUtils.mergeSignalStreamKeys(mappedKeys, mappedSort, offset, limit);
return LuceneUtils.mergeSignalStreamKeys(mappedKeys, finalSort, offset, limit);
}
private Mono<SearchResult<T, U>> transformLuceneResultWithValues(LLSearchResult llSearchResult,
@Nullable MultiSort<SearchResultItem<T, U>> sort,
private <V> Mono<SearchResult<T, U>> transformLuceneResultWithValues(LLSearchResult llSearchResult,
@Nullable MultiSort<SearchResultItem<T, U>, V> sort,
LLScoreMode scoreMode,
long offset,
@Nullable Long limit,
ValueGetter<T, U> valueGetter) {
Flux<SearchResult<T, U>> mappedKeys = llSearchResult
.results()
.map(flux -> new SearchResult<>(flux.results().flatMapSequential(signal -> {
var key = indicizer.getKey(signal.getKey());
return valueGetter
.get(key)
.map(value -> new SearchResultItem<>(key, value, signal.getScore()));
}), flux.totalHitsCount()));
MultiSort<SearchResultItem<T, U>> finalSort;
.map(flux -> new SearchResult<>(flux
.results()
.map(signal -> {
var key = signal.key().map(indicizer::getKey);
return new SearchResultItem<>(key, key.flatMap(valueGetter::get), signal.score());
}), flux.totalHitsCount()));
MultiSort<SearchResultItem<T, U>, ?> finalSort;
if (scoreMode != LLScoreMode.COMPLETE_NO_SCORES && sort == null) {
finalSort = MultiSort.topScoreWithValues();
} else {
finalSort = sort;
}
MultiSort<SearchResultItem<T, U>> mappedSort;
if (finalSort != null) {
mappedSort = new MultiSort<>(
finalSort.getQuerySort(),
(signal1, signal2) -> finalSort.getResultSort().compare((signal1), signal2)
);
} else {
mappedSort = null;
}
return LuceneUtils.mergeSignalStreamItems(mappedKeys, mappedSort, offset, limit);
return LuceneUtils.mergeSignalStreamItems(mappedKeys, finalSort, offset, limit);
}
/**
@ -172,7 +151,7 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
* @return the collection has one or more flux
*/
@Override
public Mono<SearchResultKeys<T>> moreLikeThis(ClientQueryParams<SearchResultKey<T>> queryParams,
public <V> Mono<SearchResultKeys<T>> moreLikeThis(ClientQueryParams<SearchResultKey<T>, V> queryParams,
T key,
U mltDocumentValue) {
Flux<Tuple2<String, Set<String>>> mltDocumentFields
@ -197,7 +176,7 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
* @return the collection has one or more flux
*/
@Override
public Mono<SearchResult<T, U>> moreLikeThisWithValues(ClientQueryParams<SearchResultItem<T, U>> queryParams,
public <V> Mono<SearchResult<T, U>> moreLikeThisWithValues(ClientQueryParams<SearchResultItem<T, U>, V> queryParams,
T key,
U mltDocumentValue,
ValueGetter<T, U> valueGetter) {
@ -226,7 +205,7 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
* @return the collection has one or more flux
*/
@Override
public Mono<SearchResultKeys<T>> search(ClientQueryParams<SearchResultKey<T>> queryParams) {
public <V> Mono<SearchResultKeys<T>> search(ClientQueryParams<SearchResultKey<T>, V> queryParams) {
return luceneIndex
.search(resolveSnapshot(queryParams.snapshot()),
fixOffset(luceneIndex, queryParams.toQueryParams()),
@ -248,7 +227,7 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
* @return the collection has one or more flux
*/
@Override
public Mono<SearchResult<T, U>> searchWithValues(ClientQueryParams<SearchResultItem<T, U>> queryParams,
public <V> Mono<SearchResult<T, U>> searchWithValues(ClientQueryParams<SearchResultItem<T, U>, V> queryParams,
ValueGetter<T, U> valueGetter) {
return luceneIndex
.search(resolveSnapshot(queryParams.snapshot()), fixOffset(luceneIndex, queryParams.toQueryParams()), indicizer.getKeyFieldName())
@ -263,7 +242,7 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
@Override
public Mono<Long> count(@Nullable CompositeSnapshot snapshot, Query query) {
return this.search(ClientQueryParams.<SearchResultKey<T>>builder().snapshot(snapshot).query(query).limit(0).build())
return this.search(ClientQueryParams.<SearchResultKey<T>, Object>builder().snapshot(snapshot).query(query).limit(0).build())
.map(SearchResultKeys::totalHitsCount);
}

View File

@ -6,85 +6,104 @@ import it.cavallium.dbengine.client.query.current.data.ScoreSort;
import it.cavallium.dbengine.client.query.current.data.Sort;
import it.cavallium.dbengine.database.LLKeyScore;
import java.util.Comparator;
import java.util.function.Function;
import java.util.function.ToIntFunction;
import java.util.function.ToLongFunction;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Mono;
public class MultiSort<T> {
public class MultiSort<T, U> {
private final Sort querySort;
private final Comparator<T> resultSort;
@NotNull
private final Function<T, Mono<U>> transformer;
private final Comparator<U> resultSort;
public MultiSort(Sort querySort, Comparator<T> resultSort) {
public MultiSort(Sort querySort, Function<T, Mono<U>> transformer, Comparator<U> resultSort) {
this.querySort = querySort;
this.transformer = transformer;
this.resultSort = resultSort;
}
/**
* Sort a lucene field and the results by a numeric sort field and an int value
* @param fieldName Lucene SortedNumericSortField field name
* @param transformer Transform a value to a comparable value asynchronously
* @param toIntFunction function to retrieve the integer value of each result
* @param reverse descending sort
* @param <T> result type
* @return MultiSort object
*/
public static <T> MultiSort<T> sortedNumericInt(String fieldName, ToIntFunction<T> toIntFunction, boolean reverse) {
public static <T, U> MultiSort<T, U> sortedNumericInt(String fieldName,
Function<T, Mono<U>> transformer,
ToIntFunction<U> toIntFunction,
boolean reverse) {
// Create lucene sort
Sort querySort = NumericSort.of(fieldName, reverse);
// Create result sort
Comparator<T> resultSort = Comparator.comparingInt(toIntFunction);
Comparator<U> resultSort = Comparator.comparingInt(toIntFunction);
if (reverse) {
resultSort = resultSort.reversed();
}
// Return the multi sort
return new MultiSort<>(querySort, resultSort);
return new MultiSort<>(querySort, transformer, resultSort);
}
/**
* Sort a lucene field and the results by a numeric sort field and an long value
* @param fieldName Lucene SortedNumericSortField field name
* @param transformer Transform a value to a comparable value asynchronously
* @param toLongFunction function to retrieve the long value of each result
* @param reverse descending sort
* @param <T> result type
* @return MultiSort object
*/
public static <T> MultiSort<T> sortedNumericLong(String fieldName, ToLongFunction<T> toLongFunction, boolean reverse) {
public static <T, U> MultiSort<T, U> sortedNumericLong(String fieldName,
Function<T, Mono<U>> transformer,
ToLongFunction<U> toLongFunction,
boolean reverse) {
// Create lucene sort
Sort querySort = NumericSort.of(fieldName, reverse);
// Create result sort
Comparator<T> resultSort = Comparator.comparingLong(toLongFunction);
Comparator<U> resultSort = Comparator.comparingLong(toLongFunction);
if (!reverse) {
resultSort = resultSort.reversed();
}
// Return the multi sort
return new MultiSort<>(querySort, resultSort);
return new MultiSort<>(querySort, transformer, resultSort);
}
public static <T> MultiSort<T> randomSortField() {
return new MultiSort<>(RandomSort.of(), (a, b) -> 0);
public static <T> MultiSort<T, T> randomSortField() {
return new MultiSort<>(RandomSort.of(), Mono::just, (a, b) -> 0);
}
public static MultiSort<LLKeyScore> topScoreRaw() {
Comparator<LLKeyScore> comp = Comparator.comparingDouble(LLKeyScore::getScore).reversed();
return new MultiSort<>(ScoreSort.of(), comp);
public static MultiSort<LLKeyScore, LLKeyScore> topScoreRaw() {
Comparator<LLKeyScore> comp = Comparator.comparingDouble(LLKeyScore::score).reversed();
return new MultiSort<>(ScoreSort.of(), Mono::just, comp);
}
public static <T> MultiSort<SearchResultKey<T>> topScore() {
return new MultiSort<>(ScoreSort.of(), Comparator.<SearchResultKey<T>>comparingDouble(SearchResultKey::getScore).reversed());
public static <T> MultiSort<SearchResultKey<T>, SearchResultKey<T>> topScore() {
return new MultiSort<>(ScoreSort.of(), Mono::just, Comparator.<SearchResultKey<T>>comparingDouble(SearchResultKey::score).reversed());
}
public static <T, U> MultiSort<SearchResultItem<T, U>> topScoreWithValues() {
return new MultiSort<>(ScoreSort.of(), Comparator.<SearchResultItem<T, U>>comparingDouble(SearchResultItem::getScore).reversed());
public static <T, U> MultiSort<SearchResultItem<T, U>, SearchResultItem<T, U>> topScoreWithValues() {
return new MultiSort<>(ScoreSort.of(), Mono::just, Comparator.<SearchResultItem<T, U>>comparingDouble(SearchResultItem::score).reversed());
}
public Sort getQuerySort() {
return querySort;
}
public Comparator<T> getResultSort() {
@NotNull
public Function<T, Mono<U>> getTransformer() {
return transformer;
}
public Comparator<U> getResultSort() {
return resultSort;
}
}

View File

@ -1,57 +1,10 @@
package it.cavallium.dbengine.client;
import java.util.Objects;
import java.util.StringJoiner;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Mono;
public class SearchResultItem<T, U> implements Comparable<SearchResultItem<T, U>> {
private final T key;
private final U value;
private final float score;
public SearchResultItem(T key, U value, float score) {
this.key = key;
this.value = value;
this.score = score;
}
public float getScore() {
return score;
}
public T getKey() {
return key;
}
public U getValue() {
return value;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SearchResultItem<?, ?> that = (SearchResultItem<?, ?>) o;
return Float.compare(that.score, score) == 0 && Objects.equals(key, that.key) && Objects.equals(value, that.value);
}
@Override
public int hashCode() {
return Objects.hash(key, value, score);
}
@Override
public String toString() {
return new StringJoiner(", ", SearchResultItem.class.getSimpleName() + "[", "]")
.add("key=" + key)
.add("value=" + value)
.add("score=" + score)
.toString();
}
public record SearchResultItem<T, U>(Mono<T> key, Mono<U> value, float score)
implements Comparable<SearchResultItem<T, U>> {
@Override
public int compareTo(@NotNull SearchResultItem<T, U> o) {

View File

@ -2,46 +2,6 @@ package it.cavallium.dbengine.client;
import java.util.Objects;
import java.util.StringJoiner;
import reactor.core.publisher.Mono;
public class SearchResultKey<T> {
private final T key;
private final float score;
public SearchResultKey(T key, float score) {
this.key = key;
this.score = score;
}
public float getScore() {
return score;
}
public T getKey() {
return key;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SearchResultKey<?> that = (SearchResultKey<?>) o;
return Float.compare(that.score, score) == 0 && Objects.equals(key, that.key);
}
@Override
public int hashCode() {
return Objects.hash(key, score);
}
@Override
public String toString() {
return new StringJoiner(", ", SearchResultKey.class.getSimpleName() + "[", "]")
.add("key=" + key)
.add("score=" + score)
.toString();
}
}
public record SearchResultKey<T>(Mono<T> key, float score) {}

View File

@ -11,11 +11,9 @@ public record SearchResultKeys<T>(Flux<SearchResultKey<T>> results, long totalHi
}
public <U> SearchResult<T, U> withValues(ValueGetter<T, U> valuesGetter) {
return new SearchResult<>(
results.flatMapSequential(item -> valuesGetter
.get(item.getKey())
.map(value -> new SearchResultItem<>(item.getKey(), value, item.getScore()))),
totalHitsCount
);
return new SearchResult<>(results.map(item -> new SearchResultItem<>(item.key(),
item.key().flatMap(valuesGetter::get),
item.score()
)), totalHitsCount);
}
}

View File

@ -14,17 +14,17 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@RecordBuilder
public final record ClientQueryParams<T>(@Nullable CompositeSnapshot snapshot,
public final record ClientQueryParams<T, U>(@Nullable CompositeSnapshot snapshot,
@NotNull Query query,
long offset,
long limit,
@Nullable Float minCompetitiveScore,
@Nullable MultiSort<T> sort,
@Nullable MultiSort<T, U> sort,
@NotNull LLScoreMode scoreMode) {
public static <T> ClientQueryParamsBuilder<T> builder() {
public static <T, U> ClientQueryParamsBuilder<T, U> builder() {
return ClientQueryParamsBuilder
.<T>builder()
.<T, U>builder()
.snapshot(null)
.offset(0)
.limit(Long.MAX_VALUE)

View File

@ -1,24 +1,10 @@
package it.cavallium.dbengine.database;
import java.util.Objects;
import java.util.StringJoiner;
import reactor.core.publisher.Mono;
public class LLKeyScore {
private final String key;
private final float score;
public LLKeyScore(String key, float score) {
this.key = key;
this.score = score;
}
public String getKey() {
return key;
}
public float getScore() {
return score;
}
public record LLKeyScore(int docId, float score, Mono<String> key) {
@Override
public boolean equals(Object o) {
@ -29,20 +15,19 @@ public class LLKeyScore {
return false;
}
LLKeyScore that = (LLKeyScore) o;
return score == that.score &&
Objects.equals(key, that.key);
return docId == that.docId && Float.compare(that.score, score) == 0;
}
@Override
public int hashCode() {
return Objects.hash(key, score);
return Objects.hash(docId, score);
}
@Override
public String toString() {
return "LLKeyScore{" +
"key=" + key +
", score=" + score +
'}';
return new StringJoiner(", ", LLKeyScore.class.getSimpleName() + "[", "]")
.add("docId=" + docId)
.add("score=" + score)
.toString();
}
}

View File

@ -8,20 +8,16 @@ import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.IllegalReferenceCountException;
import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.lucene.RandomSortField;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.ToIntFunction;
import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.FloatPoint;
@ -167,7 +163,7 @@ public class LLUtils {
}
public static it.cavallium.dbengine.database.LLKeyScore toKeyScore(LLKeyScore hit) {
return new it.cavallium.dbengine.database.LLKeyScore(hit.getKey(), hit.getScore());
return new it.cavallium.dbengine.database.LLKeyScore(hit.docId(), hit.score(), hit.key());
}
public static String toStringSafe(ByteBuf key) {

View File

@ -25,7 +25,6 @@ import it.cavallium.dbengine.lucene.searcher.AdaptiveStreamSearcher;
import it.cavallium.dbengine.lucene.searcher.AllowOnlyQueryParsingCollectorStreamSearcher;
import it.cavallium.dbengine.lucene.searcher.LuceneReactiveSearcher;
import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher;
import it.cavallium.dbengine.lucene.searcher.SortedPagedLuceneReactiveSearcher;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
@ -36,7 +35,6 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.index.ConcurrentMergeScheduler;
@ -68,7 +66,6 @@ import org.apache.lucene.util.Constants;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import org.warp.commonutils.type.ShortNamedThreadFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
@ -554,7 +551,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
+ " {}. You must use a similarity instance based on TFIDFSimilarity!", similarity);
}
// Get the reference doc and apply it to MoreLikeThis, to generate the query
// Get the reference docId and apply it to MoreLikeThis, to generate the query
var mltQuery = mlt.like((Map) mltDocumentFields);
Query luceneQuery;
if (luceneAdditionalQuery != null) {
@ -587,7 +584,8 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}
private LLKeyScore fixKeyScore(LLKeyScore keyScore, int scoreDivisor) {
return scoreDivisor == 1 ? keyScore : new LLKeyScore(keyScore.getKey(), keyScore.getScore() / (float) scoreDivisor);
return scoreDivisor == 1 ? keyScore
: new LLKeyScore(keyScore.docId(), keyScore.score() / (float) scoreDivisor, keyScore.key());
}
@Override

View File

@ -57,6 +57,8 @@ import org.novasearch.lucene.search.similarities.RobertsonSimilarity;
import org.warp.commonutils.log.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
public class LuceneUtils {
private static final Analyzer lucene4GramWordsAnalyzerEdgeInstance = new NCharGramEdgeAnalyzer(true, 4, 4);
@ -161,8 +163,9 @@ public class LuceneUtils {
/**
* Merge streams together maintaining absolute order
*/
public static <T> Flux<T> mergeStream(Flux<Flux<T>> mappedMultiResults,
@Nullable MultiSort<T> sort,
@SuppressWarnings({"unchecked"})
public static <T, U> Flux<T> mergeStream(Flux<Flux<T>> mappedMultiResults,
@Nullable MultiSort<T, U> sort,
long offset,
@Nullable Long limit) {
if (limit != null && limit == 0) {
@ -173,8 +176,15 @@ public class LuceneUtils {
if (sort == null) {
mergedFlux = Flux.merge(mappedMultiResultsList);
} else {
//noinspection unchecked
mergedFlux = Flux.mergeOrdered(32, sort.getResultSort(), mappedMultiResultsList.toArray(Flux[]::new));
mergedFlux = Flux
.mergeOrdered(32,
(a, b) -> sort.getResultSort().compare(a.getT2(), b.getT2()),
(Flux<Tuple2<T, U>>[]) mappedMultiResultsList.stream()
.map(flux -> flux.flatMapSequential(entry -> sort.getTransformer().apply(entry)
.map(transformed -> Tuples.of(entry, transformed))))
.toArray(Flux[]::new)
)
.map(Tuple2::getT1);
}
Flux<T> offsetedFlux;
if (offset > 0) {
@ -214,7 +224,8 @@ public class LuceneUtils {
if (field == null) {
logger.error("Can't get key of document docId: {}, score: {}", docId, score);
} else {
if (resultsConsumer.accept(new LLKeyScore(field.stringValue(), score)) == HandleResult.HALT) {
if (resultsConsumer.accept(new LLKeyScore(docId, score, Mono.just(field.stringValue())))
== HandleResult.HALT) {
return HandleResult.HALT;
}
}
@ -225,75 +236,71 @@ public class LuceneUtils {
/**
*
* @return the key score, or null if the result is not relevant
* @throws IOException if an error occurs
* @return false if the result is not relevant
*/
@Nullable
public static LLKeyScore collectTopDoc(Logger logger, int docId, float score, Float minCompetitiveScore,
IndexSearcher indexSearcher, String keyFieldName) throws IOException {
if (minCompetitiveScore == null || score >= minCompetitiveScore) {
Document d = indexSearcher.doc(docId, Set.of(keyFieldName));
if (d.getFields().isEmpty()) {
StringBuilder sb = new StringBuilder();
sb.append("The document docId: ").append(docId).append(", score: ").append(score).append(" is empty.");
var realFields = indexSearcher.doc(docId).getFields();
if (!realFields.isEmpty()) {
sb.append("\n");
logger.error("Present fields:\n");
boolean first = true;
for (IndexableField field : realFields) {
if (first) {
first = false;
} else {
sb.append("\n");
}
sb.append(" - ").append(field.name());
public static boolean filterTopDoc(float score, Float minCompetitiveScore) {
return minCompetitiveScore == null || score >= minCompetitiveScore;
}
@Nullable
public static String keyOfTopDoc(Logger logger, int docId, IndexSearcher indexSearcher,
String keyFieldName) throws IOException {
Document d = indexSearcher.doc(docId, Set.of(keyFieldName));
if (d.getFields().isEmpty()) {
StringBuilder sb = new StringBuilder();
sb.append("The document docId: ").append(docId).append(" is empty.");
var realFields = indexSearcher.doc(docId).getFields();
if (!realFields.isEmpty()) {
sb.append("\n");
logger.error("Present fields:\n");
boolean first = true;
for (IndexableField field : realFields) {
if (first) {
first = false;
} else {
sb.append("\n");
}
}
throw new IOException(sb.toString());
} else {
var field = d.getField(keyFieldName);
if (field == null) {
throw new IOException("Can't get key of document docId: " + docId + ", score: " + score);
} else {
return new LLKeyScore(field.stringValue(), score);
sb.append(" - ").append(field.name());
}
}
throw new IOException(sb.toString());
} else {
return null;
var field = d.getField(keyFieldName);
if (field == null) {
throw new IOException("Can't get key of document docId: " + docId);
} else {
return field.stringValue();
}
}
}
public static <T> Mono<SearchResultKeys<T>> mergeSignalStreamKeys(Flux<SearchResultKeys<T>> mappedKeys,
MultiSort<SearchResultKey<T>> sort,
public static <T, V> Mono<SearchResultKeys<T>> mergeSignalStreamKeys(Flux<SearchResultKeys<T>> mappedKeys,
MultiSort<SearchResultKey<T>, V> sort,
long offset,
Long limit) {
return mappedKeys.reduce(
new SearchResultKeys<>(Flux.empty(), 0L),
(a, b) -> new SearchResultKeys<>(LuceneUtils.mergeStream(Flux.just(a.results(), b.results()),
sort,
offset,
limit
sort, offset, limit
), a.totalHitsCount() + b.totalHitsCount())
);
}
public static <T, U> Mono<SearchResult<T, U>> mergeSignalStreamItems(Flux<SearchResult<T, U>> mappedKeys,
MultiSort<SearchResultItem<T, U>> sort,
public static <T, U, V> Mono<SearchResult<T, U>> mergeSignalStreamItems(Flux<SearchResult<T, U>> mappedKeys,
MultiSort<SearchResultItem<T, U>, V> sort,
long offset,
Long limit) {
return mappedKeys.reduce(
new SearchResult<>(Flux.empty(), 0L),
(a, b) -> new SearchResult<>(LuceneUtils.mergeStream(Flux.just(a.results(), b.results()),
sort,
offset,
limit
sort, offset, limit
), a.totalHitsCount() + b.totalHitsCount())
);
}
public static Mono<LLSearchResultShard> mergeSignalStreamRaw(Flux<LLSearchResultShard> mappedKeys,
MultiSort<LLKeyScore> mappedSort,
MultiSort<LLKeyScore, LLKeyScore> mappedSort,
long offset,
Long limit) {
return mappedKeys.reduce(

View File

@ -10,10 +10,11 @@ import reactor.core.scheduler.Scheduler;
public class AdaptiveReactiveSearcher implements LuceneReactiveSearcher {
public static final int PAGED_THRESHOLD = 1000;
private static final LuceneReactiveSearcher count = new CountLuceneReactiveSearcher();
private static final LuceneReactiveSearcher sortedPaged = new SortedPagedLuceneReactiveSearcher();
private static final LuceneReactiveSearcher unsortedPaged = new UnsortedPagedLuceneReactiveSearcher();
private static final LuceneReactiveSearcher paged = new PagedLuceneReactiveSearcher();
private static final LuceneReactiveSearcher simple = new SimpleLuceneReactiveSearcher();
@Override
public Mono<LuceneReactiveSearchInstance> search(IndexSearcher indexSearcher,
@ -37,8 +38,8 @@ public class AdaptiveReactiveSearcher implements LuceneReactiveSearcher {
scheduler
);
}
if (luceneSort != null) {
return sortedPaged.search(indexSearcher,
if (offset + limit > PAGED_THRESHOLD) {
return paged.search(indexSearcher,
query,
offset,
limit,
@ -49,7 +50,7 @@ public class AdaptiveReactiveSearcher implements LuceneReactiveSearcher {
scheduler
);
} else {
return unsortedPaged.search(indexSearcher,
return simple.search(indexSearcher,
query,
offset,
limit,

View File

@ -1,13 +1,20 @@
package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.lucene.LuceneUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
@ -36,4 +43,27 @@ public interface LuceneReactiveSearcher {
@Nullable Float minCompetitiveScore,
String keyFieldName,
Scheduler scheduler);
static Flux<LLKeyScore> convertHits(
ScoreDoc[] hits,
IndexSearcher indexSearcher,
@Nullable Float minCompetitiveScore,
String keyFieldName,
Scheduler scheduler) {
return Flux
.fromArray(hits)
.map(hit -> {
int shardDocId = hit.doc;
float score = hit.score;
var keyMono = Mono.fromCallable(() -> {
if (!LuceneUtils.filterTopDoc(score, minCompetitiveScore)) {
return null;
}
//noinspection BlockingMethodInNonBlockingContext
@Nullable String collectedDoc = LuceneUtils.keyOfTopDoc(logger, shardDocId, indexSearcher, keyFieldName);
return collectedDoc;
}).subscribeOn(scheduler);
return new LLKeyScore(shardDocId, score, keyMono);
});
}
}

View File

@ -0,0 +1,151 @@
package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher.HandleResult;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldDocs;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
public class PagedLuceneReactiveSearcher implements LuceneReactiveSearcher {
private static final int FIRST_PAGE_HITS_MAX_COUNT = 10;
private static final long MIN_HITS_PER_PAGE = 20;
private static final long MAX_HITS_PER_PAGE = 1000;
@SuppressWarnings("BlockingMethodInNonBlockingContext")
@Override
public Mono<LuceneReactiveSearchInstance> search(IndexSearcher indexSearcher,
Query query,
int offset,
int limit,
@Nullable Sort luceneSort,
ScoreMode scoreMode,
@Nullable Float minCompetitiveScore,
String keyFieldName,
Scheduler scheduler) {
// todo: check if offset and limit play well together.
// check especially these cases:
// - offset > limit
// - offset > FIRST_PAGE_HITS_MAX_COUNT
// - offset > MAX_HITS_PER_PAGE
return Mono
.fromCallable(() -> {
// Run the first page search
TopDocs firstTopDocsVal;
if (offset == 0) {
if (luceneSort != null) {
firstTopDocsVal = indexSearcher.search(query,
FIRST_PAGE_HITS_MAX_COUNT,
luceneSort,
scoreMode != ScoreMode.COMPLETE_NO_SCORES
);
} else {
firstTopDocsVal = indexSearcher.search(query,
FIRST_PAGE_HITS_MAX_COUNT
);
}
} else {
firstTopDocsVal = TopDocsSearcher.getTopDocs(indexSearcher,
query,
luceneSort,
FIRST_PAGE_HITS_MAX_COUNT,
null,
scoreMode != ScoreMode.COMPLETE_NO_SCORES,
1000,
offset, FIRST_PAGE_HITS_MAX_COUNT);
}
long totalHitsCount = firstTopDocsVal.totalHits.value;
Flux<LLKeyScore> firstPageHitsFlux = LuceneReactiveSearcher.convertHits(
firstTopDocsVal.scoreDocs,
indexSearcher,
minCompetitiveScore,
keyFieldName,
scheduler
);
Flux<LLKeyScore> nextPagesFlux = Flux
.<Flux<LLKeyScore>, PageState>generate(
() -> new PageState(getLastItem(firstTopDocsVal.scoreDocs), 0),
(s, sink) -> {
if (s.lastItem() == null) {
sink.complete();
return new PageState(null, 0);
}
try {
TopDocs lastTopDocs;
if (luceneSort != null) {
lastTopDocs = indexSearcher.searchAfter(s.lastItem(),
query,
s.hitsPerPage(),
luceneSort,
scoreMode != ScoreMode.COMPLETE_NO_SCORES
);
} else {
lastTopDocs = indexSearcher.searchAfter(s.lastItem(),
query,
s.hitsPerPage()
);
}
if (lastTopDocs.scoreDocs.length > 0) {
ScoreDoc lastItem = getLastItem(lastTopDocs.scoreDocs);
var hitsList = LuceneReactiveSearcher.convertHits(
lastTopDocs.scoreDocs,
indexSearcher,
minCompetitiveScore,
keyFieldName,
scheduler
);
sink.next(hitsList);
return new PageState(lastItem, s.currentPageIndex() + 1);
} else {
sink.complete();
return new PageState(null, 0);
}
} catch (IOException e) {
sink.error(e);
return new PageState(null, 0);
}
}
)
.subscribeOn(scheduler)
.concatMap(Flux::hide);
Flux<LLKeyScore> resultsFlux = firstPageHitsFlux
.concatWith(nextPagesFlux)
.take(limit, true);
if (limit == 0) {
return new LuceneReactiveSearchInstance(totalHitsCount, Flux.empty());
} else {
return new LuceneReactiveSearchInstance(totalHitsCount, resultsFlux);
}
})
.subscribeOn(scheduler);
}
private static ScoreDoc getLastItem(ScoreDoc[] scoreDocs) {
return scoreDocs[scoreDocs.length - 1];
}
private record PageState(ScoreDoc lastItem, int currentPageIndex) {
public int hitsPerPage() {
return (int) Math.min(MAX_HITS_PER_PAGE, MIN_HITS_PER_PAGE * (1L << currentPageIndex));
}
}
}

View File

@ -13,6 +13,7 @@ import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
/**
* Unsorted search (low latency and constant memory usage)
@ -74,7 +75,8 @@ public class ParallelCollectorStreamSearcher implements LuceneStreamSearcher {
if (field == null) {
logger.error("Can't get key of document docId: {}", docId);
} else {
if (resultsConsumer.accept(new LLKeyScore(field.stringValue(), score)) == HandleResult.HALT) {
if (resultsConsumer.accept(new LLKeyScore(docId, score, Mono.just(field.stringValue())))
== HandleResult.HALT) {
return HandleResult.HALT;
}
}

View File

@ -0,0 +1,62 @@
package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.lucene.LuceneUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldDocs;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
public class SimpleLuceneReactiveSearcher implements LuceneReactiveSearcher {
@Override
public Mono<LuceneReactiveSearchInstance> search(IndexSearcher indexSearcher,
Query query,
int offset,
int limit,
@Nullable Sort luceneSort,
ScoreMode scoreMode,
@Nullable Float minCompetitiveScore,
String keyFieldName,
Scheduler scheduler) {
return Mono
.fromCallable(() -> {
TopDocs topDocs;
if (luceneSort == null) {
//noinspection BlockingMethodInNonBlockingContext
topDocs = indexSearcher.search(query,
offset + limit
);
} else {
//noinspection BlockingMethodInNonBlockingContext
topDocs = indexSearcher.search(query,
offset + limit,
luceneSort,
scoreMode != ScoreMode.COMPLETE_NO_SCORES
);
}
Flux<LLKeyScore> hitsMono = LuceneReactiveSearcher
.convertHits(
topDocs.scoreDocs,
indexSearcher,
minCompetitiveScore,
keyFieldName,
scheduler
)
.take(limit, true);
return new LuceneReactiveSearchInstance(topDocs.totalHits.value, hitsMono);
})
.subscribeOn(scheduler);
}
}

View File

@ -1,164 +0,0 @@
package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher.HandleResult;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
public class SortedPagedLuceneReactiveSearcher implements LuceneReactiveSearcher {
private static final int FIRST_PAGE_HITS_MAX_COUNT = 10;
private static final long MIN_HITS_PER_PAGE = 20;
private static final long MAX_HITS_PER_PAGE = 1000;
@SuppressWarnings("BlockingMethodInNonBlockingContext")
@Override
public Mono<LuceneReactiveSearchInstance> search(IndexSearcher indexSearcher,
Query query,
int offset,
int limit,
@Nullable Sort luceneSort,
ScoreMode scoreMode,
@Nullable Float minCompetitiveScore,
String keyFieldName,
Scheduler scheduler) {
if (luceneSort == null) {
return Mono.error(new IllegalArgumentException("Can't execute unsorted queries"));
}
// todo: check if offset and limit play well together.
// check especially these cases:
// - offset > limit
// - offset > FIRST_PAGE_HITS_MAX_COUNT
// - offset > MAX_HITS_PER_PAGE
return Mono
.fromCallable(() -> {
// Run the first page (max 1 item) search
TopDocs firstTopDocsVal;
if (offset == 0) {
firstTopDocsVal = indexSearcher.search(query,
FIRST_PAGE_HITS_MAX_COUNT,
luceneSort,
scoreMode != ScoreMode.COMPLETE_NO_SCORES
);
} else {
firstTopDocsVal = TopDocsSearcher.getTopDocs(indexSearcher,
query,
luceneSort,
FIRST_PAGE_HITS_MAX_COUNT,
null,
scoreMode != ScoreMode.COMPLETE_NO_SCORES,
1000,
offset, FIRST_PAGE_HITS_MAX_COUNT);
}
long totalHitsCount = firstTopDocsVal.totalHits.value;
Mono<List<LLKeyScore>> firstPageHitsMono = Mono
.fromCallable(() -> convertHits(FIRST_PAGE_HITS_MAX_COUNT, firstTopDocsVal.scoreDocs, indexSearcher, minCompetitiveScore, keyFieldName))
.single();
Flux<LLKeyScore> resultsFlux = firstPageHitsMono.flatMapMany(firstPageHits -> {
int firstPageHitsCount = firstPageHits.size();
Flux<LLKeyScore> firstPageHitsFlux = Flux.fromIterable(firstPageHits);
if (firstPageHitsCount < FIRST_PAGE_HITS_MAX_COUNT) {
return Flux.fromIterable(firstPageHits);
} else {
Flux<LLKeyScore> nextPagesFlux = Flux
.<List<LLKeyScore>, PageState>generate(
() -> new PageState(getLastItem(firstTopDocsVal.scoreDocs), 0, limit - firstPageHitsCount),
(s, sink) -> {
if (s.lastItem() == null || s.remainingLimit() <= 0) {
sink.complete();
return new PageState(null, 0,0);
}
try {
var lastTopDocs = indexSearcher.searchAfter(s.lastItem(),
query,
s.hitsPerPage(),
luceneSort,
scoreMode != ScoreMode.COMPLETE_NO_SCORES
);
if (lastTopDocs.scoreDocs.length > 0) {
ScoreDoc lastItem = getLastItem(lastTopDocs.scoreDocs);
var hitsList = convertHits(s.remainingLimit(),
lastTopDocs.scoreDocs,
indexSearcher,
minCompetitiveScore,
keyFieldName
);
sink.next(hitsList);
if (hitsList.size() < s.hitsPerPage()) {
return new PageState(lastItem, 0, 0);
} else {
return new PageState(lastItem, s.currentPageIndex() + 1, s.remainingLimit() - hitsList.size());
}
} else {
sink.complete();
return new PageState(null, 0, 0);
}
} catch (IOException e) {
sink.error(e);
return new PageState(null, 0, 0);
}
}
)
.subscribeOn(scheduler)
.flatMap(Flux::fromIterable);
return Flux.concat(firstPageHitsFlux, nextPagesFlux);
}
});
if (limit == 0) {
return new LuceneReactiveSearchInstance(totalHitsCount, Flux.empty());
} else {
return new LuceneReactiveSearchInstance(totalHitsCount, resultsFlux);
}
})
.subscribeOn(scheduler);
}
private List<LLKeyScore> convertHits(int currentAllowedResults,
ScoreDoc[] hits,
IndexSearcher indexSearcher,
@Nullable Float minCompetitiveScore,
String keyFieldName) throws IOException {
ArrayList<LLKeyScore> collectedResults = new ArrayList<>(hits.length);
for (ScoreDoc hit : hits) {
int docId = hit.doc;
float score = hit.score;
if (currentAllowedResults-- > 0) {
@Nullable LLKeyScore collectedDoc = LuceneUtils.collectTopDoc(logger, docId, score,
minCompetitiveScore, indexSearcher, keyFieldName);
if (collectedDoc != null) {
collectedResults.add(collectedDoc);
}
} else {
break;
}
}
return collectedResults;
}
private static ScoreDoc getLastItem(ScoreDoc[] scoreDocs) {
return scoreDocs[scoreDocs.length - 1];
}
private record PageState(ScoreDoc lastItem, int currentPageIndex, int remainingLimit) {
public int hitsPerPage() {
return (int) Math.min(MAX_HITS_PER_PAGE, MIN_HITS_PER_PAGE * (1L << currentPageIndex));
}
}
}

View File

@ -1,154 +0,0 @@
package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.lucene.LuceneUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
public class UnsortedPagedLuceneReactiveSearcher implements LuceneReactiveSearcher {
private static final int FIRST_PAGE_HITS_MAX_COUNT = 10;
private static final long MIN_HITS_PER_PAGE = 20;
private static final long MAX_HITS_PER_PAGE = 1000;
@SuppressWarnings("BlockingMethodInNonBlockingContext")
@Override
public Mono<LuceneReactiveSearchInstance> search(IndexSearcher indexSearcher,
Query query,
int offset,
int limit,
@Nullable Sort luceneSort,
ScoreMode scoreMode,
@Nullable Float minCompetitiveScore,
String keyFieldName,
Scheduler scheduler) {
if (luceneSort != null) {
return Mono.error(new IllegalArgumentException("Can't search sorted queries"));
}
// todo: check if offset and limit play well together.
// check especially these cases:
// - offset > limit
// - offset > FIRST_PAGE_HITS_MAX_COUNT
// - offset > MAX_HITS_PER_PAGE
return Mono
.fromCallable(() -> {
// Run the first page (max 1 item) search
TopDocs firstTopDocsVal;
if (offset == 0) {
firstTopDocsVal = indexSearcher.search(query, FIRST_PAGE_HITS_MAX_COUNT);
} else {
firstTopDocsVal = TopDocsSearcher.getTopDocs(indexSearcher,
query,
null,
FIRST_PAGE_HITS_MAX_COUNT,
null,
scoreMode != ScoreMode.COMPLETE_NO_SCORES,
1000,
offset, FIRST_PAGE_HITS_MAX_COUNT);
}
long totalHitsCount = firstTopDocsVal.totalHits.value;
Mono<List<LLKeyScore>> firstPageHitsMono = Mono
.fromCallable(() -> convertHits(FIRST_PAGE_HITS_MAX_COUNT, firstTopDocsVal.scoreDocs, indexSearcher, minCompetitiveScore, keyFieldName))
.single();
Flux<LLKeyScore> resultsFlux = firstPageHitsMono.flatMapMany(firstPageHits -> {
int firstPageHitsCount = firstPageHits.size();
Flux<LLKeyScore> firstPageHitsFlux = Flux.fromIterable(firstPageHits);
if (firstPageHitsCount < FIRST_PAGE_HITS_MAX_COUNT) {
return Flux.fromIterable(firstPageHits);
} else {
Flux<LLKeyScore> nextPagesFlux = Flux
.<List<LLKeyScore>, PageState>generate(
() -> new PageState(getLastItem(firstTopDocsVal.scoreDocs), 0, limit - firstPageHitsCount),
(s, sink) -> {
if (s.lastItem() == null || s.remainingLimit() <= 0) {
sink.complete();
return new PageState(null, 0,0);
}
try {
var lastTopDocs = indexSearcher.searchAfter(s.lastItem(), query, s.hitsPerPage());
if (lastTopDocs.scoreDocs.length > 0) {
ScoreDoc lastItem = getLastItem(lastTopDocs.scoreDocs);
var hitsList = convertHits(s.remainingLimit(),
lastTopDocs.scoreDocs,
indexSearcher,
minCompetitiveScore,
keyFieldName
);
sink.next(hitsList);
if (hitsList.size() < s.hitsPerPage()) {
return new PageState(lastItem, 0, 0);
} else {
return new PageState(lastItem, s.currentPageIndex() + 1, s.remainingLimit() - hitsList.size());
}
} else {
sink.complete();
return new PageState(null, 0, 0);
}
} catch (IOException e) {
sink.error(e);
return new PageState(null, 0, 0);
}
}
)
.subscribeOn(scheduler)
.flatMap(Flux::fromIterable);
return Flux.concat(firstPageHitsFlux, nextPagesFlux);
}
});
if (limit == 0) {
return new LuceneReactiveSearchInstance(totalHitsCount, Flux.empty());
} else {
return new LuceneReactiveSearchInstance(totalHitsCount, resultsFlux);
}
})
.subscribeOn(scheduler);
}
private List<LLKeyScore> convertHits(int currentAllowedResults,
ScoreDoc[] hits,
IndexSearcher indexSearcher,
@Nullable Float minCompetitiveScore,
String keyFieldName) throws IOException {
ArrayList<LLKeyScore> collectedResults = new ArrayList<>(hits.length);
for (ScoreDoc hit : hits) {
int docId = hit.doc;
float score = hit.score;
if (currentAllowedResults-- > 0) {
@Nullable LLKeyScore collectedDoc = LuceneUtils.collectTopDoc(logger, docId, score,
minCompetitiveScore, indexSearcher, keyFieldName);
if (collectedDoc != null) {
collectedResults.add(collectedDoc);
}
} else {
break;
}
}
return collectedResults;
}
private static ScoreDoc getLastItem(ScoreDoc[] scoreDocs) {
return scoreDocs[scoreDocs.length - 1];
}
private record PageState(ScoreDoc lastItem, int currentPageIndex, int remainingLimit) {
public int hitsPerPage() {
return (int) Math.min(MAX_HITS_PER_PAGE, MIN_HITS_PER_PAGE * (1L << currentPageIndex));
}
}
}