From a5d4584a11c3219a4a098ada7139b18373007175 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Mon, 5 Jul 2021 12:05:45 +0200 Subject: [PATCH] Adaptive reactive lucene search engine, lazy results --- .../dbengine/client/LuceneIndex.java | 9 +- .../dbengine/client/LuceneIndexImpl.java | 59 ++----- .../cavallium/dbengine/client/MultiSort.java | 57 ++++-- .../dbengine/client/SearchResultItem.java | 53 +----- .../dbengine/client/SearchResultKey.java | 44 +---- .../dbengine/client/SearchResultKeys.java | 10 +- .../client/query/ClientQueryParams.java | 8 +- .../dbengine/database/LLKeyScore.java | 33 +--- .../cavallium/dbengine/database/LLUtils.java | 6 +- .../database/disk/LLLocalLuceneIndex.java | 8 +- .../dbengine/lucene/LuceneUtils.java | 99 ++++++----- .../searcher/AdaptiveReactiveSearcher.java | 11 +- .../searcher/LuceneReactiveSearcher.java | 30 ++++ .../searcher/PagedLuceneReactiveSearcher.java | 151 ++++++++++++++++ .../ParallelCollectorStreamSearcher.java | 4 +- .../SimpleLuceneReactiveSearcher.java | 62 +++++++ .../SortedPagedLuceneReactiveSearcher.java | 164 ------------------ .../UnsortedPagedLuceneReactiveSearcher.java | 154 ---------------- 18 files changed, 393 insertions(+), 569 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLuceneReactiveSearcher.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneReactiveSearcher.java delete mode 100644 src/main/java/it/cavallium/dbengine/lucene/searcher/SortedPagedLuceneReactiveSearcher.java delete mode 100644 src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedPagedLuceneReactiveSearcher.java diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java index 425d5fa..61a17b9 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java @@ -10,6 +10,7 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; @SuppressWarnings("unused") public interface LuceneIndex extends LLSnapshottable { @@ -42,16 +43,16 @@ public interface LuceneIndex extends LLSnapshottable { Mono deleteAll(); - Mono> moreLikeThis(ClientQueryParams> queryParams, T key, U mltDocumentValue); + Mono> moreLikeThis(ClientQueryParams, V> queryParams, T key, U mltDocumentValue); - Mono> moreLikeThisWithValues(ClientQueryParams> queryParams, + Mono> moreLikeThisWithValues(ClientQueryParams, V> queryParams, T key, U mltDocumentValue, ValueGetter valueGetter); - Mono> search(ClientQueryParams> queryParams); + Mono> search(ClientQueryParams, V> queryParams); - Mono> searchWithValues(ClientQueryParams> queryParams, + Mono> searchWithValues(ClientQueryParams, V> queryParams, ValueGetter valueGetter); Mono count(@Nullable CompositeSnapshot snapshot, Query query); diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java index 18fbfd7..a5a50b7 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java @@ -1,7 +1,6 @@ package it.cavallium.dbengine.client; import it.cavallium.dbengine.client.query.ClientQueryParams; -import it.cavallium.dbengine.client.query.ClientQueryParamsBuilder; import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.database.LLLuceneIndex; @@ -101,7 +100,7 @@ public class LuceneIndexImpl implements LuceneIndex { } private Mono> transformLuceneResult(LLSearchResult llSearchResult, - @Nullable MultiSort> sort, + @Nullable MultiSort, ?> sort, LLScoreMode scoreMode, long offset, @Nullable Long limit) { @@ -109,59 +108,39 @@ public class LuceneIndexImpl implements LuceneIndex { .results() .map(flux -> new SearchResultKeys<>(flux .results() - .map(signal -> new SearchResultKey<>(indicizer.getKey(signal.getKey()), signal.getScore())), + .map(signal -> new SearchResultKey<>(signal.key().map(indicizer::getKey), signal.score())), flux.totalHitsCount() )); - MultiSort> finalSort; + MultiSort, ?> finalSort; if (scoreMode != LLScoreMode.COMPLETE_NO_SCORES && sort == null) { finalSort = MultiSort.topScore(); } else { finalSort = sort; } - - MultiSort> mappedSort; - if (finalSort != null) { - mappedSort = new MultiSort<>( - finalSort.getQuerySort(), - (signal1, signal2) -> finalSort.getResultSort().compare((signal1), signal2) - ); - } else { - mappedSort = null; - } - return LuceneUtils.mergeSignalStreamKeys(mappedKeys, mappedSort, offset, limit); + return LuceneUtils.mergeSignalStreamKeys(mappedKeys, finalSort, offset, limit); } - private Mono> transformLuceneResultWithValues(LLSearchResult llSearchResult, - @Nullable MultiSort> sort, + private Mono> transformLuceneResultWithValues(LLSearchResult llSearchResult, + @Nullable MultiSort, V> sort, LLScoreMode scoreMode, long offset, @Nullable Long limit, ValueGetter valueGetter) { Flux> mappedKeys = llSearchResult .results() - .map(flux -> new SearchResult<>(flux.results().flatMapSequential(signal -> { - var key = indicizer.getKey(signal.getKey()); - return valueGetter - .get(key) - .map(value -> new SearchResultItem<>(key, value, signal.getScore())); - }), flux.totalHitsCount())); - MultiSort> finalSort; + .map(flux -> new SearchResult<>(flux + .results() + .map(signal -> { + var key = signal.key().map(indicizer::getKey); + return new SearchResultItem<>(key, key.flatMap(valueGetter::get), signal.score()); + }), flux.totalHitsCount())); + MultiSort, ?> finalSort; if (scoreMode != LLScoreMode.COMPLETE_NO_SCORES && sort == null) { finalSort = MultiSort.topScoreWithValues(); } else { finalSort = sort; } - - MultiSort> mappedSort; - if (finalSort != null) { - mappedSort = new MultiSort<>( - finalSort.getQuerySort(), - (signal1, signal2) -> finalSort.getResultSort().compare((signal1), signal2) - ); - } else { - mappedSort = null; - } - return LuceneUtils.mergeSignalStreamItems(mappedKeys, mappedSort, offset, limit); + return LuceneUtils.mergeSignalStreamItems(mappedKeys, finalSort, offset, limit); } /** @@ -172,7 +151,7 @@ public class LuceneIndexImpl implements LuceneIndex { * @return the collection has one or more flux */ @Override - public Mono> moreLikeThis(ClientQueryParams> queryParams, + public Mono> moreLikeThis(ClientQueryParams, V> queryParams, T key, U mltDocumentValue) { Flux>> mltDocumentFields @@ -197,7 +176,7 @@ public class LuceneIndexImpl implements LuceneIndex { * @return the collection has one or more flux */ @Override - public Mono> moreLikeThisWithValues(ClientQueryParams> queryParams, + public Mono> moreLikeThisWithValues(ClientQueryParams, V> queryParams, T key, U mltDocumentValue, ValueGetter valueGetter) { @@ -226,7 +205,7 @@ public class LuceneIndexImpl implements LuceneIndex { * @return the collection has one or more flux */ @Override - public Mono> search(ClientQueryParams> queryParams) { + public Mono> search(ClientQueryParams, V> queryParams) { return luceneIndex .search(resolveSnapshot(queryParams.snapshot()), fixOffset(luceneIndex, queryParams.toQueryParams()), @@ -248,7 +227,7 @@ public class LuceneIndexImpl implements LuceneIndex { * @return the collection has one or more flux */ @Override - public Mono> searchWithValues(ClientQueryParams> queryParams, + public Mono> searchWithValues(ClientQueryParams, V> queryParams, ValueGetter valueGetter) { return luceneIndex .search(resolveSnapshot(queryParams.snapshot()), fixOffset(luceneIndex, queryParams.toQueryParams()), indicizer.getKeyFieldName()) @@ -263,7 +242,7 @@ public class LuceneIndexImpl implements LuceneIndex { @Override public Mono count(@Nullable CompositeSnapshot snapshot, Query query) { - return this.search(ClientQueryParams.>builder().snapshot(snapshot).query(query).limit(0).build()) + return this.search(ClientQueryParams., Object>builder().snapshot(snapshot).query(query).limit(0).build()) .map(SearchResultKeys::totalHitsCount); } diff --git a/src/main/java/it/cavallium/dbengine/client/MultiSort.java b/src/main/java/it/cavallium/dbengine/client/MultiSort.java index ec88865..376ce74 100644 --- a/src/main/java/it/cavallium/dbengine/client/MultiSort.java +++ b/src/main/java/it/cavallium/dbengine/client/MultiSort.java @@ -6,85 +6,104 @@ import it.cavallium.dbengine.client.query.current.data.ScoreSort; import it.cavallium.dbengine.client.query.current.data.Sort; import it.cavallium.dbengine.database.LLKeyScore; import java.util.Comparator; +import java.util.function.Function; import java.util.function.ToIntFunction; import java.util.function.ToLongFunction; +import org.jetbrains.annotations.NotNull; +import reactor.core.publisher.Mono; -public class MultiSort { +public class MultiSort { private final Sort querySort; - private final Comparator resultSort; + @NotNull + private final Function> transformer; + private final Comparator resultSort; - public MultiSort(Sort querySort, Comparator resultSort) { + public MultiSort(Sort querySort, Function> transformer, Comparator resultSort) { this.querySort = querySort; + this.transformer = transformer; this.resultSort = resultSort; } /** * Sort a lucene field and the results by a numeric sort field and an int value * @param fieldName Lucene SortedNumericSortField field name + * @param transformer Transform a value to a comparable value asynchronously * @param toIntFunction function to retrieve the integer value of each result * @param reverse descending sort * @param result type * @return MultiSort object */ - public static MultiSort sortedNumericInt(String fieldName, ToIntFunction toIntFunction, boolean reverse) { + public static MultiSort sortedNumericInt(String fieldName, + Function> transformer, + ToIntFunction toIntFunction, + boolean reverse) { // Create lucene sort Sort querySort = NumericSort.of(fieldName, reverse); // Create result sort - Comparator resultSort = Comparator.comparingInt(toIntFunction); + Comparator resultSort = Comparator.comparingInt(toIntFunction); if (reverse) { resultSort = resultSort.reversed(); } // Return the multi sort - return new MultiSort<>(querySort, resultSort); + return new MultiSort<>(querySort, transformer, resultSort); } /** * Sort a lucene field and the results by a numeric sort field and an long value * @param fieldName Lucene SortedNumericSortField field name + * @param transformer Transform a value to a comparable value asynchronously * @param toLongFunction function to retrieve the long value of each result * @param reverse descending sort * @param result type * @return MultiSort object */ - public static MultiSort sortedNumericLong(String fieldName, ToLongFunction toLongFunction, boolean reverse) { + public static MultiSort sortedNumericLong(String fieldName, + Function> transformer, + ToLongFunction toLongFunction, + boolean reverse) { // Create lucene sort Sort querySort = NumericSort.of(fieldName, reverse); // Create result sort - Comparator resultSort = Comparator.comparingLong(toLongFunction); + Comparator resultSort = Comparator.comparingLong(toLongFunction); if (!reverse) { resultSort = resultSort.reversed(); } // Return the multi sort - return new MultiSort<>(querySort, resultSort); + return new MultiSort<>(querySort, transformer, resultSort); } - public static MultiSort randomSortField() { - return new MultiSort<>(RandomSort.of(), (a, b) -> 0); + public static MultiSort randomSortField() { + return new MultiSort<>(RandomSort.of(), Mono::just, (a, b) -> 0); } - public static MultiSort topScoreRaw() { - Comparator comp = Comparator.comparingDouble(LLKeyScore::getScore).reversed(); - return new MultiSort<>(ScoreSort.of(), comp); + public static MultiSort topScoreRaw() { + Comparator comp = Comparator.comparingDouble(LLKeyScore::score).reversed(); + return new MultiSort<>(ScoreSort.of(), Mono::just, comp); } - public static MultiSort> topScore() { - return new MultiSort<>(ScoreSort.of(), Comparator.>comparingDouble(SearchResultKey::getScore).reversed()); + public static MultiSort, SearchResultKey> topScore() { + return new MultiSort<>(ScoreSort.of(), Mono::just, Comparator.>comparingDouble(SearchResultKey::score).reversed()); } - public static MultiSort> topScoreWithValues() { - return new MultiSort<>(ScoreSort.of(), Comparator.>comparingDouble(SearchResultItem::getScore).reversed()); + public static MultiSort, SearchResultItem> topScoreWithValues() { + return new MultiSort<>(ScoreSort.of(), Mono::just, Comparator.>comparingDouble(SearchResultItem::score).reversed()); } public Sort getQuerySort() { return querySort; } - public Comparator getResultSort() { + @NotNull + public Function> getTransformer() { + return transformer; + } + + public Comparator getResultSort() { return resultSort; } } diff --git a/src/main/java/it/cavallium/dbengine/client/SearchResultItem.java b/src/main/java/it/cavallium/dbengine/client/SearchResultItem.java index e07d6d3..aac0e12 100644 --- a/src/main/java/it/cavallium/dbengine/client/SearchResultItem.java +++ b/src/main/java/it/cavallium/dbengine/client/SearchResultItem.java @@ -1,57 +1,10 @@ package it.cavallium.dbengine.client; -import java.util.Objects; -import java.util.StringJoiner; import org.jetbrains.annotations.NotNull; +import reactor.core.publisher.Mono; -public class SearchResultItem implements Comparable> { - private final T key; - private final U value; - private final float score; - - public SearchResultItem(T key, U value, float score) { - this.key = key; - this.value = value; - this.score = score; - } - - public float getScore() { - return score; - } - - public T getKey() { - return key; - } - - public U getValue() { - return value; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - SearchResultItem that = (SearchResultItem) o; - return Float.compare(that.score, score) == 0 && Objects.equals(key, that.key) && Objects.equals(value, that.value); - } - - @Override - public int hashCode() { - return Objects.hash(key, value, score); - } - - @Override - public String toString() { - return new StringJoiner(", ", SearchResultItem.class.getSimpleName() + "[", "]") - .add("key=" + key) - .add("value=" + value) - .add("score=" + score) - .toString(); - } +public record SearchResultItem(Mono key, Mono value, float score) + implements Comparable> { @Override public int compareTo(@NotNull SearchResultItem o) { diff --git a/src/main/java/it/cavallium/dbengine/client/SearchResultKey.java b/src/main/java/it/cavallium/dbengine/client/SearchResultKey.java index 2c0cb3c..6193f3d 100644 --- a/src/main/java/it/cavallium/dbengine/client/SearchResultKey.java +++ b/src/main/java/it/cavallium/dbengine/client/SearchResultKey.java @@ -2,46 +2,6 @@ package it.cavallium.dbengine.client; import java.util.Objects; import java.util.StringJoiner; +import reactor.core.publisher.Mono; -public class SearchResultKey { - private final T key; - private final float score; - - public SearchResultKey(T key, float score) { - this.key = key; - this.score = score; - } - - public float getScore() { - return score; - } - - public T getKey() { - return key; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - SearchResultKey that = (SearchResultKey) o; - return Float.compare(that.score, score) == 0 && Objects.equals(key, that.key); - } - - @Override - public int hashCode() { - return Objects.hash(key, score); - } - - @Override - public String toString() { - return new StringJoiner(", ", SearchResultKey.class.getSimpleName() + "[", "]") - .add("key=" + key) - .add("score=" + score) - .toString(); - } -} +public record SearchResultKey(Mono key, float score) {} diff --git a/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java b/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java index 07f4b1b..326ec30 100644 --- a/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java +++ b/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java @@ -11,11 +11,9 @@ public record SearchResultKeys(Flux> results, long totalHi } public SearchResult withValues(ValueGetter valuesGetter) { - return new SearchResult<>( - results.flatMapSequential(item -> valuesGetter - .get(item.getKey()) - .map(value -> new SearchResultItem<>(item.getKey(), value, item.getScore()))), - totalHitsCount - ); + return new SearchResult<>(results.map(item -> new SearchResultItem<>(item.key(), + item.key().flatMap(valuesGetter::get), + item.score() + )), totalHitsCount); } } diff --git a/src/main/java/it/cavallium/dbengine/client/query/ClientQueryParams.java b/src/main/java/it/cavallium/dbengine/client/query/ClientQueryParams.java index da662cf..231a0b1 100644 --- a/src/main/java/it/cavallium/dbengine/client/query/ClientQueryParams.java +++ b/src/main/java/it/cavallium/dbengine/client/query/ClientQueryParams.java @@ -14,17 +14,17 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @RecordBuilder -public final record ClientQueryParams(@Nullable CompositeSnapshot snapshot, +public final record ClientQueryParams(@Nullable CompositeSnapshot snapshot, @NotNull Query query, long offset, long limit, @Nullable Float minCompetitiveScore, - @Nullable MultiSort sort, + @Nullable MultiSort sort, @NotNull LLScoreMode scoreMode) { - public static ClientQueryParamsBuilder builder() { + public static ClientQueryParamsBuilder builder() { return ClientQueryParamsBuilder - .builder() + .builder() .snapshot(null) .offset(0) .limit(Long.MAX_VALUE) diff --git a/src/main/java/it/cavallium/dbengine/database/LLKeyScore.java b/src/main/java/it/cavallium/dbengine/database/LLKeyScore.java index 41bdc57..0c0d28e 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLKeyScore.java +++ b/src/main/java/it/cavallium/dbengine/database/LLKeyScore.java @@ -1,24 +1,10 @@ package it.cavallium.dbengine.database; import java.util.Objects; +import java.util.StringJoiner; +import reactor.core.publisher.Mono; -public class LLKeyScore { - - private final String key; - private final float score; - - public LLKeyScore(String key, float score) { - this.key = key; - this.score = score; - } - - public String getKey() { - return key; - } - - public float getScore() { - return score; - } +public record LLKeyScore(int docId, float score, Mono key) { @Override public boolean equals(Object o) { @@ -29,20 +15,19 @@ public class LLKeyScore { return false; } LLKeyScore that = (LLKeyScore) o; - return score == that.score && - Objects.equals(key, that.key); + return docId == that.docId && Float.compare(that.score, score) == 0; } @Override public int hashCode() { - return Objects.hash(key, score); + return Objects.hash(docId, score); } @Override public String toString() { - return "LLKeyScore{" + - "key=" + key + - ", score=" + score + - '}'; + return new StringJoiner(", ", LLKeyScore.class.getSimpleName() + "[", "]") + .add("docId=" + docId) + .add("score=" + score) + .toString(); } } diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 98718a2..5e59b9b 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -8,20 +8,16 @@ import io.netty.buffer.ByteBufUtil; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.util.IllegalReferenceCountException; -import it.cavallium.dbengine.client.IndicizerAnalyzers; import it.cavallium.dbengine.lucene.RandomSortField; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; -import java.util.LinkedList; import java.util.List; import java.util.Map.Entry; import java.util.Objects; import java.util.function.Function; import java.util.function.ToIntFunction; -import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.FloatPoint; @@ -167,7 +163,7 @@ public class LLUtils { } public static it.cavallium.dbengine.database.LLKeyScore toKeyScore(LLKeyScore hit) { - return new it.cavallium.dbengine.database.LLKeyScore(hit.getKey(), hit.getScore()); + return new it.cavallium.dbengine.database.LLKeyScore(hit.docId(), hit.score(), hit.key()); } public static String toStringSafe(ByteBuf key) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 00428d5..638a30d 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -25,7 +25,6 @@ import it.cavallium.dbengine.lucene.searcher.AdaptiveStreamSearcher; import it.cavallium.dbengine.lucene.searcher.AllowOnlyQueryParsingCollectorStreamSearcher; import it.cavallium.dbengine.lucene.searcher.LuceneReactiveSearcher; import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher; -import it.cavallium.dbengine.lucene.searcher.SortedPagedLuceneReactiveSearcher; import java.io.IOException; import java.nio.file.Path; import java.time.Duration; @@ -36,7 +35,6 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.lucene.index.ConcurrentMergeScheduler; @@ -68,7 +66,6 @@ import org.apache.lucene.util.Constants; import org.jetbrains.annotations.Nullable; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; -import org.warp.commonutils.type.ShortNamedThreadFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; @@ -554,7 +551,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { + " {}. You must use a similarity instance based on TFIDFSimilarity!", similarity); } - // Get the reference doc and apply it to MoreLikeThis, to generate the query + // Get the reference docId and apply it to MoreLikeThis, to generate the query var mltQuery = mlt.like((Map) mltDocumentFields); Query luceneQuery; if (luceneAdditionalQuery != null) { @@ -587,7 +584,8 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } private LLKeyScore fixKeyScore(LLKeyScore keyScore, int scoreDivisor) { - return scoreDivisor == 1 ? keyScore : new LLKeyScore(keyScore.getKey(), keyScore.getScore() / (float) scoreDivisor); + return scoreDivisor == 1 ? keyScore + : new LLKeyScore(keyScore.docId(), keyScore.score() / (float) scoreDivisor, keyScore.key()); } @Override diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index 90a378a..78ddef0 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -57,6 +57,8 @@ import org.novasearch.lucene.search.similarities.RobertsonSimilarity; import org.warp.commonutils.log.Logger; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; public class LuceneUtils { private static final Analyzer lucene4GramWordsAnalyzerEdgeInstance = new NCharGramEdgeAnalyzer(true, 4, 4); @@ -161,8 +163,9 @@ public class LuceneUtils { /** * Merge streams together maintaining absolute order */ - public static Flux mergeStream(Flux> mappedMultiResults, - @Nullable MultiSort sort, + @SuppressWarnings({"unchecked"}) + public static Flux mergeStream(Flux> mappedMultiResults, + @Nullable MultiSort sort, long offset, @Nullable Long limit) { if (limit != null && limit == 0) { @@ -173,8 +176,15 @@ public class LuceneUtils { if (sort == null) { mergedFlux = Flux.merge(mappedMultiResultsList); } else { - //noinspection unchecked - mergedFlux = Flux.mergeOrdered(32, sort.getResultSort(), mappedMultiResultsList.toArray(Flux[]::new)); + mergedFlux = Flux + .mergeOrdered(32, + (a, b) -> sort.getResultSort().compare(a.getT2(), b.getT2()), + (Flux>[]) mappedMultiResultsList.stream() + .map(flux -> flux.flatMapSequential(entry -> sort.getTransformer().apply(entry) + .map(transformed -> Tuples.of(entry, transformed)))) + .toArray(Flux[]::new) + ) + .map(Tuple2::getT1); } Flux offsetedFlux; if (offset > 0) { @@ -214,7 +224,8 @@ public class LuceneUtils { if (field == null) { logger.error("Can't get key of document docId: {}, score: {}", docId, score); } else { - if (resultsConsumer.accept(new LLKeyScore(field.stringValue(), score)) == HandleResult.HALT) { + if (resultsConsumer.accept(new LLKeyScore(docId, score, Mono.just(field.stringValue()))) + == HandleResult.HALT) { return HandleResult.HALT; } } @@ -225,75 +236,71 @@ public class LuceneUtils { /** * - * @return the key score, or null if the result is not relevant - * @throws IOException if an error occurs + * @return false if the result is not relevant */ @Nullable - public static LLKeyScore collectTopDoc(Logger logger, int docId, float score, Float minCompetitiveScore, - IndexSearcher indexSearcher, String keyFieldName) throws IOException { - if (minCompetitiveScore == null || score >= minCompetitiveScore) { - Document d = indexSearcher.doc(docId, Set.of(keyFieldName)); - if (d.getFields().isEmpty()) { - StringBuilder sb = new StringBuilder(); - sb.append("The document docId: ").append(docId).append(", score: ").append(score).append(" is empty."); - var realFields = indexSearcher.doc(docId).getFields(); - if (!realFields.isEmpty()) { - sb.append("\n"); - logger.error("Present fields:\n"); - boolean first = true; - for (IndexableField field : realFields) { - if (first) { - first = false; - } else { - sb.append("\n"); - } - sb.append(" - ").append(field.name()); + public static boolean filterTopDoc(float score, Float minCompetitiveScore) { + return minCompetitiveScore == null || score >= minCompetitiveScore; + } + + @Nullable + public static String keyOfTopDoc(Logger logger, int docId, IndexSearcher indexSearcher, + String keyFieldName) throws IOException { + Document d = indexSearcher.doc(docId, Set.of(keyFieldName)); + if (d.getFields().isEmpty()) { + StringBuilder sb = new StringBuilder(); + sb.append("The document docId: ").append(docId).append(" is empty."); + var realFields = indexSearcher.doc(docId).getFields(); + if (!realFields.isEmpty()) { + sb.append("\n"); + logger.error("Present fields:\n"); + boolean first = true; + for (IndexableField field : realFields) { + if (first) { + first = false; + } else { + sb.append("\n"); } - } - throw new IOException(sb.toString()); - } else { - var field = d.getField(keyFieldName); - if (field == null) { - throw new IOException("Can't get key of document docId: " + docId + ", score: " + score); - } else { - return new LLKeyScore(field.stringValue(), score); + sb.append(" - ").append(field.name()); } } + throw new IOException(sb.toString()); } else { - return null; + var field = d.getField(keyFieldName); + if (field == null) { + throw new IOException("Can't get key of document docId: " + docId); + } else { + return field.stringValue(); + } } } - public static Mono> mergeSignalStreamKeys(Flux> mappedKeys, - MultiSort> sort, + public static Mono> mergeSignalStreamKeys(Flux> mappedKeys, + MultiSort, V> sort, long offset, Long limit) { return mappedKeys.reduce( new SearchResultKeys<>(Flux.empty(), 0L), (a, b) -> new SearchResultKeys<>(LuceneUtils.mergeStream(Flux.just(a.results(), b.results()), - sort, - offset, - limit + sort, offset, limit ), a.totalHitsCount() + b.totalHitsCount()) ); } - public static Mono> mergeSignalStreamItems(Flux> mappedKeys, - MultiSort> sort, + public static Mono> mergeSignalStreamItems(Flux> mappedKeys, + MultiSort, V> sort, long offset, Long limit) { return mappedKeys.reduce( new SearchResult<>(Flux.empty(), 0L), (a, b) -> new SearchResult<>(LuceneUtils.mergeStream(Flux.just(a.results(), b.results()), - sort, - offset, - limit + sort, offset, limit ), a.totalHitsCount() + b.totalHitsCount()) ); } public static Mono mergeSignalStreamRaw(Flux mappedKeys, - MultiSort mappedSort, + MultiSort mappedSort, long offset, Long limit) { return mappedKeys.reduce( diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveReactiveSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveReactiveSearcher.java index c049777..eb6a2e1 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveReactiveSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveReactiveSearcher.java @@ -10,10 +10,11 @@ import reactor.core.scheduler.Scheduler; public class AdaptiveReactiveSearcher implements LuceneReactiveSearcher { + public static final int PAGED_THRESHOLD = 1000; private static final LuceneReactiveSearcher count = new CountLuceneReactiveSearcher(); - private static final LuceneReactiveSearcher sortedPaged = new SortedPagedLuceneReactiveSearcher(); - private static final LuceneReactiveSearcher unsortedPaged = new UnsortedPagedLuceneReactiveSearcher(); + private static final LuceneReactiveSearcher paged = new PagedLuceneReactiveSearcher(); + private static final LuceneReactiveSearcher simple = new SimpleLuceneReactiveSearcher(); @Override public Mono search(IndexSearcher indexSearcher, @@ -37,8 +38,8 @@ public class AdaptiveReactiveSearcher implements LuceneReactiveSearcher { scheduler ); } - if (luceneSort != null) { - return sortedPaged.search(indexSearcher, + if (offset + limit > PAGED_THRESHOLD) { + return paged.search(indexSearcher, query, offset, limit, @@ -49,7 +50,7 @@ public class AdaptiveReactiveSearcher implements LuceneReactiveSearcher { scheduler ); } else { - return unsortedPaged.search(indexSearcher, + return simple.search(indexSearcher, query, offset, limit, diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneReactiveSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneReactiveSearcher.java index 80add67..9bb739f 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneReactiveSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneReactiveSearcher.java @@ -1,13 +1,20 @@ package it.cavallium.dbengine.lucene.searcher; +import it.cavallium.dbengine.database.LLKeyScore; +import it.cavallium.dbengine.lucene.LuceneUtils; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Sort; import org.jetbrains.annotations.Nullable; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; @@ -36,4 +43,27 @@ public interface LuceneReactiveSearcher { @Nullable Float minCompetitiveScore, String keyFieldName, Scheduler scheduler); + + static Flux convertHits( + ScoreDoc[] hits, + IndexSearcher indexSearcher, + @Nullable Float minCompetitiveScore, + String keyFieldName, + Scheduler scheduler) { + return Flux + .fromArray(hits) + .map(hit -> { + int shardDocId = hit.doc; + float score = hit.score; + var keyMono = Mono.fromCallable(() -> { + if (!LuceneUtils.filterTopDoc(score, minCompetitiveScore)) { + return null; + } + //noinspection BlockingMethodInNonBlockingContext + @Nullable String collectedDoc = LuceneUtils.keyOfTopDoc(logger, shardDocId, indexSearcher, keyFieldName); + return collectedDoc; + }).subscribeOn(scheduler); + return new LLKeyScore(shardDocId, score, keyMono); + }); + } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLuceneReactiveSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLuceneReactiveSearcher.java new file mode 100644 index 0000000..f281a59 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedLuceneReactiveSearcher.java @@ -0,0 +1,151 @@ +package it.cavallium.dbengine.lucene.searcher; + +import it.cavallium.dbengine.database.LLKeyScore; +import it.cavallium.dbengine.lucene.LuceneUtils; +import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher.HandleResult; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TopFieldDocs; +import org.jetbrains.annotations.Nullable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; + +public class PagedLuceneReactiveSearcher implements LuceneReactiveSearcher { + + private static final int FIRST_PAGE_HITS_MAX_COUNT = 10; + private static final long MIN_HITS_PER_PAGE = 20; + private static final long MAX_HITS_PER_PAGE = 1000; + + @SuppressWarnings("BlockingMethodInNonBlockingContext") + @Override + public Mono search(IndexSearcher indexSearcher, + Query query, + int offset, + int limit, + @Nullable Sort luceneSort, + ScoreMode scoreMode, + @Nullable Float minCompetitiveScore, + String keyFieldName, + Scheduler scheduler) { + // todo: check if offset and limit play well together. + // check especially these cases: + // - offset > limit + // - offset > FIRST_PAGE_HITS_MAX_COUNT + // - offset > MAX_HITS_PER_PAGE + return Mono + .fromCallable(() -> { + // Run the first page search + TopDocs firstTopDocsVal; + if (offset == 0) { + if (luceneSort != null) { + firstTopDocsVal = indexSearcher.search(query, + FIRST_PAGE_HITS_MAX_COUNT, + luceneSort, + scoreMode != ScoreMode.COMPLETE_NO_SCORES + ); + } else { + firstTopDocsVal = indexSearcher.search(query, + FIRST_PAGE_HITS_MAX_COUNT + ); + } + } else { + firstTopDocsVal = TopDocsSearcher.getTopDocs(indexSearcher, + query, + luceneSort, + FIRST_PAGE_HITS_MAX_COUNT, + null, + scoreMode != ScoreMode.COMPLETE_NO_SCORES, + 1000, + offset, FIRST_PAGE_HITS_MAX_COUNT); + } + long totalHitsCount = firstTopDocsVal.totalHits.value; + Flux firstPageHitsFlux = LuceneReactiveSearcher.convertHits( + firstTopDocsVal.scoreDocs, + indexSearcher, + minCompetitiveScore, + keyFieldName, + scheduler + ); + + Flux nextPagesFlux = Flux + ., PageState>generate( + () -> new PageState(getLastItem(firstTopDocsVal.scoreDocs), 0), + (s, sink) -> { + if (s.lastItem() == null) { + sink.complete(); + return new PageState(null, 0); + } + + try { + TopDocs lastTopDocs; + if (luceneSort != null) { + lastTopDocs = indexSearcher.searchAfter(s.lastItem(), + query, + s.hitsPerPage(), + luceneSort, + scoreMode != ScoreMode.COMPLETE_NO_SCORES + ); + } else { + lastTopDocs = indexSearcher.searchAfter(s.lastItem(), + query, + s.hitsPerPage() + ); + } + if (lastTopDocs.scoreDocs.length > 0) { + ScoreDoc lastItem = getLastItem(lastTopDocs.scoreDocs); + var hitsList = LuceneReactiveSearcher.convertHits( + lastTopDocs.scoreDocs, + indexSearcher, + minCompetitiveScore, + keyFieldName, + scheduler + ); + sink.next(hitsList); + return new PageState(lastItem, s.currentPageIndex() + 1); + } else { + sink.complete(); + return new PageState(null, 0); + } + } catch (IOException e) { + sink.error(e); + return new PageState(null, 0); + } + } + ) + .subscribeOn(scheduler) + .concatMap(Flux::hide); + + Flux resultsFlux = firstPageHitsFlux + .concatWith(nextPagesFlux) + .take(limit, true); + + + if (limit == 0) { + return new LuceneReactiveSearchInstance(totalHitsCount, Flux.empty()); + } else { + return new LuceneReactiveSearchInstance(totalHitsCount, resultsFlux); + } + }) + .subscribeOn(scheduler); + } + + private static ScoreDoc getLastItem(ScoreDoc[] scoreDocs) { + return scoreDocs[scoreDocs.length - 1]; + } + + private record PageState(ScoreDoc lastItem, int currentPageIndex) { + + public int hitsPerPage() { + return (int) Math.min(MAX_HITS_PER_PAGE, MIN_HITS_PER_PAGE * (1L << currentPageIndex)); + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ParallelCollectorStreamSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ParallelCollectorStreamSearcher.java index 99e4bb1..35a8140 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/ParallelCollectorStreamSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ParallelCollectorStreamSearcher.java @@ -13,6 +13,7 @@ import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Sort; import org.jetbrains.annotations.Nullable; +import reactor.core.publisher.Mono; /** * Unsorted search (low latency and constant memory usage) @@ -74,7 +75,8 @@ public class ParallelCollectorStreamSearcher implements LuceneStreamSearcher { if (field == null) { logger.error("Can't get key of document docId: {}", docId); } else { - if (resultsConsumer.accept(new LLKeyScore(field.stringValue(), score)) == HandleResult.HALT) { + if (resultsConsumer.accept(new LLKeyScore(docId, score, Mono.just(field.stringValue()))) + == HandleResult.HALT) { return HandleResult.HALT; } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneReactiveSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneReactiveSearcher.java new file mode 100644 index 0000000..ce676b5 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneReactiveSearcher.java @@ -0,0 +1,62 @@ +package it.cavallium.dbengine.lucene.searcher; + +import it.cavallium.dbengine.database.LLKeyScore; +import it.cavallium.dbengine.lucene.LuceneUtils; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TopFieldDocs; +import org.jetbrains.annotations.Nullable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; + +public class SimpleLuceneReactiveSearcher implements LuceneReactiveSearcher { + + @Override + public Mono search(IndexSearcher indexSearcher, + Query query, + int offset, + int limit, + @Nullable Sort luceneSort, + ScoreMode scoreMode, + @Nullable Float minCompetitiveScore, + String keyFieldName, + Scheduler scheduler) { + return Mono + .fromCallable(() -> { + TopDocs topDocs; + if (luceneSort == null) { + //noinspection BlockingMethodInNonBlockingContext + topDocs = indexSearcher.search(query, + offset + limit + ); + } else { + //noinspection BlockingMethodInNonBlockingContext + topDocs = indexSearcher.search(query, + offset + limit, + luceneSort, + scoreMode != ScoreMode.COMPLETE_NO_SCORES + ); + } + Flux hitsMono = LuceneReactiveSearcher + .convertHits( + topDocs.scoreDocs, + indexSearcher, + minCompetitiveScore, + keyFieldName, + scheduler + ) + .take(limit, true); + return new LuceneReactiveSearchInstance(topDocs.totalHits.value, hitsMono); + }) + .subscribeOn(scheduler); + } +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedPagedLuceneReactiveSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedPagedLuceneReactiveSearcher.java deleted file mode 100644 index a7e1562..0000000 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SortedPagedLuceneReactiveSearcher.java +++ /dev/null @@ -1,164 +0,0 @@ -package it.cavallium.dbengine.lucene.searcher; - -import it.cavallium.dbengine.database.LLKeyScore; -import it.cavallium.dbengine.lucene.LuceneUtils; -import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher.HandleResult; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.ScoreDoc; -import org.apache.lucene.search.ScoreMode; -import org.apache.lucene.search.Sort; -import org.apache.lucene.search.TopDocs; -import org.jetbrains.annotations.Nullable; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Scheduler; - -public class SortedPagedLuceneReactiveSearcher implements LuceneReactiveSearcher { - - private static final int FIRST_PAGE_HITS_MAX_COUNT = 10; - private static final long MIN_HITS_PER_PAGE = 20; - private static final long MAX_HITS_PER_PAGE = 1000; - - @SuppressWarnings("BlockingMethodInNonBlockingContext") - @Override - public Mono search(IndexSearcher indexSearcher, - Query query, - int offset, - int limit, - @Nullable Sort luceneSort, - ScoreMode scoreMode, - @Nullable Float minCompetitiveScore, - String keyFieldName, - Scheduler scheduler) { - if (luceneSort == null) { - return Mono.error(new IllegalArgumentException("Can't execute unsorted queries")); - } - // todo: check if offset and limit play well together. - // check especially these cases: - // - offset > limit - // - offset > FIRST_PAGE_HITS_MAX_COUNT - // - offset > MAX_HITS_PER_PAGE - return Mono - .fromCallable(() -> { - // Run the first page (max 1 item) search - TopDocs firstTopDocsVal; - if (offset == 0) { - firstTopDocsVal = indexSearcher.search(query, - FIRST_PAGE_HITS_MAX_COUNT, - luceneSort, - scoreMode != ScoreMode.COMPLETE_NO_SCORES - ); - } else { - firstTopDocsVal = TopDocsSearcher.getTopDocs(indexSearcher, - query, - luceneSort, - FIRST_PAGE_HITS_MAX_COUNT, - null, - scoreMode != ScoreMode.COMPLETE_NO_SCORES, - 1000, - offset, FIRST_PAGE_HITS_MAX_COUNT); - } - long totalHitsCount = firstTopDocsVal.totalHits.value; - Mono> firstPageHitsMono = Mono - .fromCallable(() -> convertHits(FIRST_PAGE_HITS_MAX_COUNT, firstTopDocsVal.scoreDocs, indexSearcher, minCompetitiveScore, keyFieldName)) - .single(); - Flux resultsFlux = firstPageHitsMono.flatMapMany(firstPageHits -> { - int firstPageHitsCount = firstPageHits.size(); - Flux firstPageHitsFlux = Flux.fromIterable(firstPageHits); - if (firstPageHitsCount < FIRST_PAGE_HITS_MAX_COUNT) { - return Flux.fromIterable(firstPageHits); - } else { - Flux nextPagesFlux = Flux - ., PageState>generate( - () -> new PageState(getLastItem(firstTopDocsVal.scoreDocs), 0, limit - firstPageHitsCount), - (s, sink) -> { - if (s.lastItem() == null || s.remainingLimit() <= 0) { - sink.complete(); - return new PageState(null, 0,0); - } - - try { - var lastTopDocs = indexSearcher.searchAfter(s.lastItem(), - query, - s.hitsPerPage(), - luceneSort, - scoreMode != ScoreMode.COMPLETE_NO_SCORES - ); - if (lastTopDocs.scoreDocs.length > 0) { - ScoreDoc lastItem = getLastItem(lastTopDocs.scoreDocs); - var hitsList = convertHits(s.remainingLimit(), - lastTopDocs.scoreDocs, - indexSearcher, - minCompetitiveScore, - keyFieldName - ); - sink.next(hitsList); - if (hitsList.size() < s.hitsPerPage()) { - return new PageState(lastItem, 0, 0); - } else { - return new PageState(lastItem, s.currentPageIndex() + 1, s.remainingLimit() - hitsList.size()); - } - } else { - sink.complete(); - return new PageState(null, 0, 0); - } - } catch (IOException e) { - sink.error(e); - return new PageState(null, 0, 0); - } - } - ) - .subscribeOn(scheduler) - .flatMap(Flux::fromIterable); - return Flux.concat(firstPageHitsFlux, nextPagesFlux); - } - }); - - if (limit == 0) { - return new LuceneReactiveSearchInstance(totalHitsCount, Flux.empty()); - } else { - return new LuceneReactiveSearchInstance(totalHitsCount, resultsFlux); - } - }) - .subscribeOn(scheduler); - } - - - private List convertHits(int currentAllowedResults, - ScoreDoc[] hits, - IndexSearcher indexSearcher, - @Nullable Float minCompetitiveScore, - String keyFieldName) throws IOException { - ArrayList collectedResults = new ArrayList<>(hits.length); - for (ScoreDoc hit : hits) { - int docId = hit.doc; - float score = hit.score; - - if (currentAllowedResults-- > 0) { - @Nullable LLKeyScore collectedDoc = LuceneUtils.collectTopDoc(logger, docId, score, - minCompetitiveScore, indexSearcher, keyFieldName); - if (collectedDoc != null) { - collectedResults.add(collectedDoc); - } - } else { - break; - } - } - return collectedResults; - } - - private static ScoreDoc getLastItem(ScoreDoc[] scoreDocs) { - return scoreDocs[scoreDocs.length - 1]; - } - - private record PageState(ScoreDoc lastItem, int currentPageIndex, int remainingLimit) { - - public int hitsPerPage() { - return (int) Math.min(MAX_HITS_PER_PAGE, MIN_HITS_PER_PAGE * (1L << currentPageIndex)); - } - } -} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedPagedLuceneReactiveSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedPagedLuceneReactiveSearcher.java deleted file mode 100644 index 735721b..0000000 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedPagedLuceneReactiveSearcher.java +++ /dev/null @@ -1,154 +0,0 @@ -package it.cavallium.dbengine.lucene.searcher; - -import it.cavallium.dbengine.database.LLKeyScore; -import it.cavallium.dbengine.lucene.LuceneUtils; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.ScoreDoc; -import org.apache.lucene.search.ScoreMode; -import org.apache.lucene.search.Sort; -import org.apache.lucene.search.TopDocs; -import org.jetbrains.annotations.Nullable; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Scheduler; - -public class UnsortedPagedLuceneReactiveSearcher implements LuceneReactiveSearcher { - - private static final int FIRST_PAGE_HITS_MAX_COUNT = 10; - private static final long MIN_HITS_PER_PAGE = 20; - private static final long MAX_HITS_PER_PAGE = 1000; - - @SuppressWarnings("BlockingMethodInNonBlockingContext") - @Override - public Mono search(IndexSearcher indexSearcher, - Query query, - int offset, - int limit, - @Nullable Sort luceneSort, - ScoreMode scoreMode, - @Nullable Float minCompetitiveScore, - String keyFieldName, - Scheduler scheduler) { - if (luceneSort != null) { - return Mono.error(new IllegalArgumentException("Can't search sorted queries")); - } - // todo: check if offset and limit play well together. - // check especially these cases: - // - offset > limit - // - offset > FIRST_PAGE_HITS_MAX_COUNT - // - offset > MAX_HITS_PER_PAGE - return Mono - .fromCallable(() -> { - // Run the first page (max 1 item) search - TopDocs firstTopDocsVal; - if (offset == 0) { - firstTopDocsVal = indexSearcher.search(query, FIRST_PAGE_HITS_MAX_COUNT); - } else { - firstTopDocsVal = TopDocsSearcher.getTopDocs(indexSearcher, - query, - null, - FIRST_PAGE_HITS_MAX_COUNT, - null, - scoreMode != ScoreMode.COMPLETE_NO_SCORES, - 1000, - offset, FIRST_PAGE_HITS_MAX_COUNT); - } - long totalHitsCount = firstTopDocsVal.totalHits.value; - Mono> firstPageHitsMono = Mono - .fromCallable(() -> convertHits(FIRST_PAGE_HITS_MAX_COUNT, firstTopDocsVal.scoreDocs, indexSearcher, minCompetitiveScore, keyFieldName)) - .single(); - Flux resultsFlux = firstPageHitsMono.flatMapMany(firstPageHits -> { - int firstPageHitsCount = firstPageHits.size(); - Flux firstPageHitsFlux = Flux.fromIterable(firstPageHits); - if (firstPageHitsCount < FIRST_PAGE_HITS_MAX_COUNT) { - return Flux.fromIterable(firstPageHits); - } else { - Flux nextPagesFlux = Flux - ., PageState>generate( - () -> new PageState(getLastItem(firstTopDocsVal.scoreDocs), 0, limit - firstPageHitsCount), - (s, sink) -> { - if (s.lastItem() == null || s.remainingLimit() <= 0) { - sink.complete(); - return new PageState(null, 0,0); - } - - try { - var lastTopDocs = indexSearcher.searchAfter(s.lastItem(), query, s.hitsPerPage()); - if (lastTopDocs.scoreDocs.length > 0) { - ScoreDoc lastItem = getLastItem(lastTopDocs.scoreDocs); - var hitsList = convertHits(s.remainingLimit(), - lastTopDocs.scoreDocs, - indexSearcher, - minCompetitiveScore, - keyFieldName - ); - sink.next(hitsList); - if (hitsList.size() < s.hitsPerPage()) { - return new PageState(lastItem, 0, 0); - } else { - return new PageState(lastItem, s.currentPageIndex() + 1, s.remainingLimit() - hitsList.size()); - } - } else { - sink.complete(); - return new PageState(null, 0, 0); - } - } catch (IOException e) { - sink.error(e); - return new PageState(null, 0, 0); - } - } - ) - .subscribeOn(scheduler) - .flatMap(Flux::fromIterable); - return Flux.concat(firstPageHitsFlux, nextPagesFlux); - } - }); - - if (limit == 0) { - return new LuceneReactiveSearchInstance(totalHitsCount, Flux.empty()); - } else { - return new LuceneReactiveSearchInstance(totalHitsCount, resultsFlux); - } - }) - .subscribeOn(scheduler); - } - - - private List convertHits(int currentAllowedResults, - ScoreDoc[] hits, - IndexSearcher indexSearcher, - @Nullable Float minCompetitiveScore, - String keyFieldName) throws IOException { - ArrayList collectedResults = new ArrayList<>(hits.length); - for (ScoreDoc hit : hits) { - int docId = hit.doc; - float score = hit.score; - - if (currentAllowedResults-- > 0) { - @Nullable LLKeyScore collectedDoc = LuceneUtils.collectTopDoc(logger, docId, score, - minCompetitiveScore, indexSearcher, keyFieldName); - if (collectedDoc != null) { - collectedResults.add(collectedDoc); - } - } else { - break; - } - } - return collectedResults; - } - - private static ScoreDoc getLastItem(ScoreDoc[] scoreDocs) { - return scoreDocs[scoreDocs.length - 1]; - } - - private record PageState(ScoreDoc lastItem, int currentPageIndex, int remainingLimit) { - - public int hitsPerPage() { - return (int) Math.min(MAX_HITS_PER_PAGE, MIN_HITS_PER_PAGE * (1L << currentPageIndex)); - } - } -}