From 918ff710917fb7ca96a1547aa4e40654d00ae39e Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 1 Apr 2021 19:48:25 +0200 Subject: [PATCH] Implement offsets in lucene search --- src/main/data-generator/lucene-query.yaml | 1 + .../dbengine/client/LuceneIndex.java | 39 ++++++- .../client/query/ClientQueryParams.java | 4 + .../dbengine/database/LLLuceneIndex.java | 6 +- .../database/disk/LLLocalLuceneIndex.java | 95 ++++++++-------- .../disk/LLLocalMultiLuceneIndex.java | 11 ++ .../dbengine/lucene/LuceneUtils.java | 22 +++- .../searcher/AdaptiveStreamSearcher.java | 39 +++---- ...lyQueryParsingCollectorStreamSearcher.java | 28 +++-- .../lucene/searcher/CountStreamSearcher.java | 30 +++-- .../lucene/searcher/LuceneSearchInstance.java | 11 ++ .../lucene/searcher/LuceneStreamSearcher.java | 9 +- .../lucene/searcher/PagedStreamSearcher.java | 104 +++++++++++++----- .../ParallelCollectorStreamSearcher.java | 88 +++++++++------ .../lucene/searcher/SimpleStreamSearcher.java | 62 ++++++----- .../lucene/searcher/TopDocsSearcher.java | 45 ++++++++ 16 files changed, 394 insertions(+), 200 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneSearchInstance.java create mode 100644 src/main/java/it/cavallium/dbengine/lucene/searcher/TopDocsSearcher.java diff --git a/src/main/data-generator/lucene-query.yaml b/src/main/data-generator/lucene-query.yaml index 7b9a212..78c2f8b 100644 --- a/src/main/data-generator/lucene-query.yaml +++ b/src/main/data-generator/lucene-query.yaml @@ -175,6 +175,7 @@ versions: QueryParams: data: query: Query + offset: long limit: long minCompetitiveScore: -float sort: Sort diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java index fee7f02..10a52b4 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java @@ -2,6 +2,7 @@ package it.cavallium.dbengine.client; import it.cavallium.dbengine.client.query.ClientQueryParams; import it.cavallium.dbengine.client.query.current.data.Query; +import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.LLScoreMode; import it.cavallium.dbengine.database.LLSearchResult; @@ -78,9 +79,26 @@ public class LuceneIndex implements LLSnapshottable { return luceneIndex.deleteAll(); } + private static QueryParams fixOffset(LLLuceneIndex luceneIndex, QueryParams queryParams) { + if (luceneIndex.supportsOffset()) { + return queryParams; + } else { + return queryParams.setOffset(0); + } + } + + private static long fixTransformOffset(LLLuceneIndex luceneIndex, long offset) { + if (luceneIndex.supportsOffset()) { + return 0; + } else { + return offset; + } + } + private Mono> transformLuceneResult(LLSearchResult llSearchResult, @Nullable MultiSort> sort, LLScoreMode scoreMode, + long offset, @Nullable Long limit) { Flux> mappedKeys = llSearchResult .getResults() @@ -104,12 +122,13 @@ public class LuceneIndex implements LLSnapshottable { } else { mappedSort = null; } - return LuceneUtils.mergeSignalStreamKeys(mappedKeys, mappedSort, limit); + return LuceneUtils.mergeSignalStreamKeys(mappedKeys, mappedSort, offset, limit); } private Mono> transformLuceneResultWithValues(LLSearchResult llSearchResult, @Nullable MultiSort> sort, LLScoreMode scoreMode, + long offset, @Nullable Long limit, ValueGetter valueGetter) { Flux> mappedKeys = llSearchResult @@ -135,7 +154,7 @@ public class LuceneIndex implements LLSnapshottable { } else { mappedSort = null; } - return LuceneUtils.mergeSignalStreamItems(mappedKeys, mappedSort, limit); + return LuceneUtils.mergeSignalStreamItems(mappedKeys, mappedSort, offset, limit); } /** @@ -152,15 +171,17 @@ public class LuceneIndex implements LLSnapshottable { Flux>> mltDocumentFields = indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue); return luceneIndex - .moreLikeThis(resolveSnapshot(queryParams.getSnapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName(), mltDocumentFields) + .moreLikeThis(resolveSnapshot(queryParams.getSnapshot()), fixOffset(luceneIndex, queryParams.toQueryParams()), indicizer.getKeyFieldName(), mltDocumentFields) .flatMap(llSearchResult -> this.transformLuceneResult(llSearchResult, queryParams.getSort(), queryParams.getScoreMode(), + fixTransformOffset(luceneIndex, queryParams.getOffset()), queryParams.getLimit() )); } + /** * * @param queryParams the limit is valid for each lucene instance. @@ -177,13 +198,14 @@ public class LuceneIndex implements LLSnapshottable { = indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue); return luceneIndex .moreLikeThis(resolveSnapshot(queryParams.getSnapshot()), - queryParams.toQueryParams(), + fixOffset(luceneIndex, queryParams.toQueryParams()), indicizer.getKeyFieldName(), mltDocumentFields ) .flatMap(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult, queryParams.getSort(), queryParams.getScoreMode(), + fixTransformOffset(luceneIndex, queryParams.getOffset()), queryParams.getLimit(), valueGetter )); @@ -199,10 +221,14 @@ public class LuceneIndex implements LLSnapshottable { public Mono> search( ClientQueryParams> queryParams) { return luceneIndex - .search(resolveSnapshot(queryParams.getSnapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName()) + .search(resolveSnapshot(queryParams.getSnapshot()), + fixOffset(luceneIndex, queryParams.toQueryParams()), + indicizer.getKeyFieldName() + ) .flatMap(llSearchResult -> this.transformLuceneResult(llSearchResult, queryParams.getSort(), queryParams.getScoreMode(), + fixTransformOffset(luceneIndex, queryParams.getOffset()), queryParams.getLimit() )); } @@ -218,10 +244,11 @@ public class LuceneIndex implements LLSnapshottable { ClientQueryParams> queryParams, ValueGetter valueGetter) { return luceneIndex - .search(resolveSnapshot(queryParams.getSnapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName()) + .search(resolveSnapshot(queryParams.getSnapshot()), fixOffset(luceneIndex, queryParams.toQueryParams()), indicizer.getKeyFieldName()) .flatMap(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult, queryParams.getSort(), queryParams.getScoreMode(), + fixTransformOffset(luceneIndex, queryParams.getOffset()), queryParams.getLimit(), valueGetter )); diff --git a/src/main/java/it/cavallium/dbengine/client/query/ClientQueryParams.java b/src/main/java/it/cavallium/dbengine/client/query/ClientQueryParams.java index 62dccb7..a349fcd 100644 --- a/src/main/java/it/cavallium/dbengine/client/query/ClientQueryParams.java +++ b/src/main/java/it/cavallium/dbengine/client/query/ClientQueryParams.java @@ -35,6 +35,9 @@ public final class ClientQueryParams { @NonNull private Query query; + @Default + private long offset = 0; + @Default private long limit = Long.MAX_VALUE; @@ -75,6 +78,7 @@ public final class ClientQueryParams { .query(getQuery()) .sort(getSort() != null ? getSort().getQuerySort() : NoSort.of()) .minCompetitiveScore(Nullablefloat.ofNullable(getMinCompetitiveScore())) + .offset(getOffset()) .limit(getLimit()) .scoreMode(toScoreMode()) .build(); diff --git a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java index fe09ad1..c37497f 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java @@ -49,15 +49,17 @@ public interface LLLuceneIndex extends LLSnapshottable { Mono search(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName); default Mono count(@Nullable LLSnapshot snapshot, Query query) { - QueryParams params = QueryParams.of(query, 0, Nullablefloat.empty(), NoSort.of(), ScoreMode.of(false, false)); + QueryParams params = QueryParams.of(query, 0, 0, Nullablefloat.empty(), NoSort.of(), ScoreMode.of(false, false)); return Mono.from(this.search(snapshot, params, null) - .flatMap(results -> LuceneUtils.mergeSignalStreamRaw(results.getResults(), null, null)) + .flatMap(results -> LuceneUtils.mergeSignalStreamRaw(results.getResults(), null, 0, null)) .map(LLSearchResultShard::getTotalHitsCount) .defaultIfEmpty(0L)); } boolean isLowMemoryMode(); + boolean supportsOffset(); + Mono close(); /** diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index b2f0e80..d5f57cc 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -19,6 +19,7 @@ import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer; import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity; import it.cavallium.dbengine.lucene.searcher.AdaptiveStreamSearcher; import it.cavallium.dbengine.lucene.searcher.AllowOnlyQueryParsingCollectorStreamSearcher; +import it.cavallium.dbengine.lucene.searcher.LuceneSearchInstance; import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher; import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher.HandleResult; import java.io.IOException; @@ -451,6 +452,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .subscribeOn(luceneQueryScheduler) .map(luceneQuery -> luceneSearch(doDistributedPre, indexSearcher, + queryParams.getOffset(), queryParams.getLimit(), queryParams.getMinCompetitiveScore().getNullable(), keyFieldName, @@ -510,6 +512,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { return luceneSearch(doDistributedPre, indexSearcher, + queryParams.getOffset(), queryParams.getLimit(), queryParams.getMinCompetitiveScore().getNullable(), keyFieldName, @@ -534,6 +537,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { private LLSearchResult luceneSearch(boolean doDistributedPre, IndexSearcher indexSearcher, + long offset, long limit, @Nullable Float minCompetitiveScore, String keyFieldName, @@ -543,26 +547,29 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { ScoreMode luceneScoreMode) { return new LLSearchResult(Mono.create(monoSink -> { + LuceneSearchInstance luceneSearchInstance; long totalHitsCount; try { - if (!doDistributedPre) { - AtomicLong totalHitsCountAtomic = new AtomicLong(0); + if (doDistributedPre) { //noinspection BlockingMethodInNonBlockingContext - streamSearcher.search(indexSearcher, + allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery); + monoSink.success(new LLSearchResultShard(Flux.empty(), 0)); + return; + } else { + int boundedOffset = Math.max(0, offset > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) offset); + int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit); + //noinspection BlockingMethodInNonBlockingContext + luceneSearchInstance = streamSearcher.search(indexSearcher, luceneQuery, - 0, + boundedOffset, + boundedLimit, luceneSort, luceneScoreMode, minCompetitiveScore, - keyFieldName, - keyScore -> HandleResult.HALT, - totalHitsCountAtomic::set + keyFieldName ); - totalHitsCount = totalHitsCountAtomic.get(); - } else { //noinspection BlockingMethodInNonBlockingContext - allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery); - totalHitsCount = 0; + totalHitsCount = luceneSearchInstance.getTotalHitsCount(); } } catch (Exception ex) { monoSink.error(ex); @@ -590,43 +597,30 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { }); try { - if (!doDistributedPre) { - int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit); - AtomicLong atomicTotalHitsCount = new AtomicLong(0); - //noinspection BlockingMethodInNonBlockingContext - streamSearcher.search(indexSearcher, - luceneQuery, - boundedLimit, - luceneSort, - luceneScoreMode, - minCompetitiveScore, - keyFieldName, - keyScore -> { - try { - if (cancelled.get()) { - return HandleResult.HALT; - } - while (!requests.tryAcquire(500, TimeUnit.MILLISECONDS)) { - if (cancelled.get()) { - return HandleResult.HALT; - } - } - sink.next(fixKeyScore(keyScore, scoreDivisor)); - if (cancelled.get()) { - return HandleResult.HALT; - } else { - return HandleResult.CONTINUE; - } - } catch (Exception ex) { - sink.error(ex); - cancelled.set(true); - requests.release(Integer.MAX_VALUE); - return HandleResult.HALT; - } - }, - atomicTotalHitsCount::set - ); - } + //noinspection BlockingMethodInNonBlockingContext + luceneSearchInstance.getResults(keyScore -> { + try { + if (cancelled.get()) { + return HandleResult.HALT; + } + while (!requests.tryAcquire(500, TimeUnit.MILLISECONDS)) { + if (cancelled.get()) { + return HandleResult.HALT; + } + } + sink.next(fixKeyScore(keyScore, scoreDivisor)); + if (cancelled.get()) { + return HandleResult.HALT; + } else { + return HandleResult.CONTINUE; + } + } catch (Exception ex) { + sink.error(ex); + cancelled.set(true); + requests.release(Integer.MAX_VALUE); + return HandleResult.HALT; + } + }); sink.complete(); } catch (Exception ex) { sink.error(ex); @@ -720,4 +714,9 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { public boolean isLowMemoryMode() { return lowMemory; } + + @Override + public boolean supportsOffset() { + return true; + } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index 2f85bd8..e268c8f 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -212,6 +212,9 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { QueryParams queryParams, String keyFieldName, Flux>> mltDocumentFields) { + if (queryParams.getOffset() != 0) { + return Mono.error(new IllegalArgumentException("MultiLuceneIndex requires an offset equal to 0")); + } long actionId; int scoreDivisor; Flux>> mltDocumentFieldsShared; @@ -284,6 +287,9 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { long actionId; int scoreDivisor; Mono distributedPre; + if (queryParams.getOffset() != 0) { + return Mono.error(new IllegalArgumentException("MultiLuceneIndex requires an offset equal to 0")); + } if (luceneIndices.length <= 1 || !queryParams.getScoreMode().getComputeScores()) { actionId = -1; scoreDivisor = 1; @@ -410,4 +416,9 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { public boolean isLowMemoryMode() { return luceneIndices[0].isLowMemoryMode(); } + + @Override + public boolean supportsOffset() { + return false; + } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index e21ae23..d117ae7 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -181,6 +181,7 @@ public class LuceneUtils { */ public static Flux mergeStream(Flux> mappedMultiResults, @Nullable MultiSort sort, + long offset, @Nullable Long limit) { if (limit != null && limit == 0) { return mappedMultiResults.flatMap(f -> f).ignoreElements().flux(); @@ -193,10 +194,16 @@ public class LuceneUtils { //noinspection unchecked mergedFlux = Flux.mergeOrdered(32, sort.getResultSort(), mappedMultiResultsList.toArray(Flux[]::new)); } - if (limit == null || limit == Long.MAX_VALUE) { - return mergedFlux; + Flux offsetedFlux; + if (offset > 0) { + offsetedFlux = mergedFlux.skip(offset); } else { - return mergedFlux.limitRequest(limit); + offsetedFlux = mergedFlux; + } + if (limit == null || limit == Long.MAX_VALUE) { + return offsetedFlux; + } else { + return offsetedFlux.limitRequest(limit); } }); } @@ -236,31 +243,34 @@ public class LuceneUtils { public static Mono> mergeSignalStreamKeys(Flux> mappedKeys, MultiSort> sort, + long offset, Long limit) { return mappedKeys.reduce( new SearchResultKeys<>(Flux.empty(), 0L), (a, b) -> new SearchResultKeys(LuceneUtils - .mergeStream(Flux.just(a.getResults(), b.getResults()), sort, limit), a.getTotalHitsCount() + b.getTotalHitsCount()) + .mergeStream(Flux.just(a.getResults(), b.getResults()), sort, offset, limit), a.getTotalHitsCount() + b.getTotalHitsCount()) ); } public static Mono> mergeSignalStreamItems(Flux> mappedKeys, MultiSort> sort, + long offset, Long limit) { return mappedKeys.reduce( new SearchResult<>(Flux.empty(), 0L), (a, b) -> new SearchResult(LuceneUtils - .mergeStream(Flux.just(a.getResults(), b.getResults()), sort, limit), a.getTotalHitsCount() + b.getTotalHitsCount()) + .mergeStream(Flux.just(a.getResults(), b.getResults()), sort, offset, limit), a.getTotalHitsCount() + b.getTotalHitsCount()) ); } public static Mono mergeSignalStreamRaw(Flux mappedKeys, MultiSort mappedSort, + long offset, Long limit) { return mappedKeys.reduce( new LLSearchResultShard(Flux.empty(), 0), (s1, s2) -> new LLSearchResultShard( - LuceneUtils.mergeStream(Flux.just(s1.getResults(), s2.getResults()), mappedSort, limit), + LuceneUtils.mergeStream(Flux.just(s1.getResults(), s2.getResults()), mappedSort, offset, limit), s1.getTotalHitsCount() + s2.getTotalHitsCount() ) ); diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveStreamSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveStreamSearcher.java index b353785..728280c 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveStreamSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AdaptiveStreamSearcher.java @@ -1,7 +1,6 @@ package it.cavallium.dbengine.lucene.searcher; import java.io.IOException; -import java.util.function.LongConsumer; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreMode; @@ -21,56 +20,52 @@ public class AdaptiveStreamSearcher implements LuceneStreamSearcher { public AdaptiveStreamSearcher() { this.simpleStreamSearcher = new SimpleStreamSearcher(); - this.parallelCollectorStreamSearcher = new ParallelCollectorStreamSearcher(); - this.pagedStreamSearcher = new PagedStreamSearcher(simpleStreamSearcher); this.countStreamSearcher = new CountStreamSearcher(); + this.parallelCollectorStreamSearcher = new ParallelCollectorStreamSearcher(countStreamSearcher); + this.pagedStreamSearcher = new PagedStreamSearcher(simpleStreamSearcher); } @Override - public void search(IndexSearcher indexSearcher, + public LuceneSearchInstance search(IndexSearcher indexSearcher, Query query, + int offset, int limit, @Nullable Sort luceneSort, ScoreMode scoreMode, @Nullable Float minCompetitiveScore, - String keyFieldName, - ResultItemConsumer consumer, - LongConsumer totalHitsConsumer) throws IOException { + String keyFieldName) throws IOException { if (limit == 0) { - totalHitsConsumer.accept(countStreamSearcher.count(indexSearcher, query)); - } else if (luceneSort == null && ENABLE_PARALLEL_COLLECTOR) { - parallelCollectorStreamSearcher.search(indexSearcher, + return countStreamSearcher.count(indexSearcher, query); + } else if (offset == 0 && luceneSort == null && ENABLE_PARALLEL_COLLECTOR) { + return parallelCollectorStreamSearcher.search(indexSearcher, query, + offset, limit, null, scoreMode, minCompetitiveScore, - keyFieldName, - consumer, - totalHitsConsumer + keyFieldName ); } else { - if (luceneSort != null && limit > PagedStreamSearcher.MAX_ITEMS_PER_PAGE) { - pagedStreamSearcher.search(indexSearcher, + if (offset > 0 || limit > PagedStreamSearcher.MAX_ITEMS_PER_PAGE) { + return pagedStreamSearcher.search(indexSearcher, query, + offset, limit, luceneSort, scoreMode, minCompetitiveScore, - keyFieldName, - consumer, - totalHitsConsumer + keyFieldName ); } else { - simpleStreamSearcher.search(indexSearcher, + return simpleStreamSearcher.search(indexSearcher, query, + offset, limit, luceneSort, scoreMode, minCompetitiveScore, - keyFieldName, - consumer, - totalHitsConsumer + keyFieldName ); } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AllowOnlyQueryParsingCollectorStreamSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AllowOnlyQueryParsingCollectorStreamSearcher.java index 16ba4fd..772666a 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AllowOnlyQueryParsingCollectorStreamSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AllowOnlyQueryParsingCollectorStreamSearcher.java @@ -2,7 +2,6 @@ package it.cavallium.dbengine.lucene.searcher; import java.io.IOException; import java.util.Collection; -import java.util.function.LongConsumer; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.Collector; import org.apache.lucene.search.CollectorManager; @@ -21,19 +20,18 @@ public class AllowOnlyQueryParsingCollectorStreamSearcher implements LuceneStrea public void search(IndexSearcher indexSearcher, Query query) throws IOException { - search(indexSearcher, query, 0, null, null, null, null, null, null); + search(indexSearcher, query, 0, 0, null, null, null, null); } @Override - public void search(IndexSearcher indexSearcher, + public LuceneSearchInstance search(IndexSearcher indexSearcher, Query query, + int offset, int limit, @Nullable Sort luceneSort, ScoreMode scoreMode, @Nullable Float minCompetitiveScore, - String keyFieldName, - ResultItemConsumer resultsConsumer, - LongConsumer totalHitsConsumer) throws IOException { + String keyFieldName) throws IOException { if (limit > 0) { throw new IllegalArgumentException("Limit > 0 not allowed"); } @@ -49,12 +47,6 @@ public class AllowOnlyQueryParsingCollectorStreamSearcher implements LuceneStrea if (keyFieldName != null) { throw new IllegalArgumentException("Key field name not allowed"); } - if (resultsConsumer != null) { - throw new IllegalArgumentException("Results consumer not allowed"); - } - if (totalHitsConsumer != null) { - throw new IllegalArgumentException("Total hits consumer not allowed"); - } indexSearcher.search(query, new CollectorManager<>() { @Override public Collector newCollector() { @@ -85,5 +77,17 @@ public class AllowOnlyQueryParsingCollectorStreamSearcher implements LuceneStrea return null; } }); + + return new LuceneSearchInstance() { + @Override + public long getTotalHitsCount() throws IOException { + throw new IllegalArgumentException("Total hits consumer not allowed"); + } + + @Override + public void getResults(ResultItemConsumer consumer) throws IOException { + throw new IllegalArgumentException("Results consumer not allowed"); + } + }; } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountStreamSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountStreamSearcher.java index e1e7521..30e1c5a 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountStreamSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountStreamSearcher.java @@ -1,7 +1,6 @@ package it.cavallium.dbengine.lucene.searcher; import java.io.IOException; -import java.util.function.LongConsumer; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreMode; @@ -14,31 +13,42 @@ import org.jetbrains.annotations.Nullable; public class CountStreamSearcher implements LuceneStreamSearcher { @Override - public void search(IndexSearcher indexSearcher, + public LuceneSearchInstance search(IndexSearcher indexSearcher, Query query, + int offset, int limit, @Nullable Sort luceneSort, ScoreMode scoreMode, @Nullable Float minCompetitiveScore, - String keyFieldName, - ResultItemConsumer resultsConsumer, - LongConsumer totalHitsConsumer) throws IOException { + String keyFieldName) throws IOException { if (limit != 0) { throw new IllegalArgumentException("CountStream doesn't support a limit different than 0"); } if (luceneSort != null) { throw new IllegalArgumentException("CountStream doesn't support sorting"); } - if (resultsConsumer != null) { - throw new IllegalArgumentException("CountStream doesn't support a results consumer"); - } if (keyFieldName != null) { throw new IllegalArgumentException("CountStream doesn't support a key field"); } - totalHitsConsumer.accept(count(indexSearcher, query)); + return count(indexSearcher, query); } - public long count(IndexSearcher indexSearcher, Query query) throws IOException { + public long countLong(IndexSearcher indexSearcher, Query query) throws IOException { return indexSearcher.count(query); } + + public LuceneSearchInstance count(IndexSearcher indexSearcher, Query query) throws IOException { + long totalHitsCount = countLong(indexSearcher, query); + return new LuceneSearchInstance() { + @Override + public long getTotalHitsCount() { + return totalHitsCount; + } + + @Override + public void getResults(ResultItemConsumer consumer) { + + } + }; + } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneSearchInstance.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneSearchInstance.java new file mode 100644 index 0000000..9c8c7f8 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneSearchInstance.java @@ -0,0 +1,11 @@ +package it.cavallium.dbengine.lucene.searcher; + +import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher.ResultItemConsumer; +import java.io.IOException; + +public interface LuceneSearchInstance { + + long getTotalHitsCount() throws IOException; + + void getResults(ResultItemConsumer consumer) throws IOException; +} diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneStreamSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneStreamSearcher.java index d692a75..b34251b 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneStreamSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneStreamSearcher.java @@ -2,7 +2,6 @@ package it.cavallium.dbengine.lucene.searcher; import it.cavallium.dbengine.database.LLKeyScore; import java.io.IOException; -import java.util.function.LongConsumer; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreMode; @@ -19,6 +18,7 @@ public interface LuceneStreamSearcher { * Do a lucene query, receiving the single results using a consumer * @param indexSearcher the index searcher, which contains all the lucene data * @param query the query + * @param offset the offset of the first result (use 0 to disable offset) * @param limit the maximum number of results * @param luceneSort the sorting method used for the search * @param scoreMode score mode @@ -28,15 +28,14 @@ public interface LuceneStreamSearcher { * @param totalHitsConsumer the consumer of total count of results * @throws IOException thrown if there is an error */ - void search(IndexSearcher indexSearcher, + LuceneSearchInstance search(IndexSearcher indexSearcher, Query query, + int offset, int limit, @Nullable Sort luceneSort, ScoreMode scoreMode, @Nullable Float minCompetitiveScore, - String keyFieldName, - ResultItemConsumer resultsConsumer, - LongConsumer totalHitsConsumer) throws IOException; + String keyFieldName) throws IOException; @FunctionalInterface interface ResultItemConsumer { diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedStreamSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedStreamSearcher.java index 2371f8d..15984a9 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedStreamSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/PagedStreamSearcher.java @@ -2,7 +2,7 @@ package it.cavallium.dbengine.lucene.searcher; import it.cavallium.dbengine.lucene.LuceneUtils; import java.io.IOException; -import java.util.function.LongConsumer; +import java.util.concurrent.atomic.AtomicReference; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreDoc; @@ -18,58 +18,105 @@ import org.warp.commonutils.type.IntWrapper; public class PagedStreamSearcher implements LuceneStreamSearcher { public static final int MAX_ITEMS_PER_PAGE = 1000; - private final LuceneStreamSearcher baseStreamSearcher; + private final SimpleStreamSearcher simpleStreamSearcher; - public PagedStreamSearcher(LuceneStreamSearcher baseStreamSearcher) { - this.baseStreamSearcher = baseStreamSearcher; + public PagedStreamSearcher(SimpleStreamSearcher simpleStreamSearcher) { + this.simpleStreamSearcher = simpleStreamSearcher; } @Override - public void search(IndexSearcher indexSearcher, + public LuceneSearchInstance search(IndexSearcher indexSearcher, Query query, + int offset, int limit, @Nullable Sort luceneSort, ScoreMode scoreMode, @Nullable Float minCompetitiveScore, - String keyFieldName, - ResultItemConsumer resultsConsumer, - LongConsumer totalHitsConsumer) throws IOException { + String keyFieldName) throws IOException { if (limit < MAX_ITEMS_PER_PAGE) { // Use a normal search method because the limit is low - baseStreamSearcher.search(indexSearcher, + simpleStreamSearcher.search(indexSearcher, query, + offset, limit, luceneSort, scoreMode, minCompetitiveScore, - keyFieldName, - resultsConsumer, - totalHitsConsumer + keyFieldName ); - return; } + IntWrapper currentAllowedResults = new IntWrapper(limit); // Run the first page search - TopDocs lastTopDocs = indexSearcher.search(query, MAX_ITEMS_PER_PAGE, luceneSort, scoreMode != ScoreMode.COMPLETE_NO_SCORES); - totalHitsConsumer.accept(lastTopDocs.totalHits.value); - if (lastTopDocs.scoreDocs.length > 0) { - ScoreDoc lastScoreDoc = getLastItem(lastTopDocs.scoreDocs); - consumeHits(currentAllowedResults, lastTopDocs.scoreDocs, indexSearcher, minCompetitiveScore, keyFieldName, resultsConsumer); + TopDocs firstTopDocsVal; + if (offset == 0) { + firstTopDocsVal = indexSearcher.search(query, + MAX_ITEMS_PER_PAGE, + luceneSort, + scoreMode != ScoreMode.COMPLETE_NO_SCORES + ); + } else { + firstTopDocsVal = new TopDocsSearcher(indexSearcher, + query, + luceneSort, + MAX_ITEMS_PER_PAGE, + null, + scoreMode != ScoreMode.COMPLETE_NO_SCORES, + 1000 + ).getTopDocs(offset, MAX_ITEMS_PER_PAGE); + } + AtomicReference firstTopDocs = new AtomicReference<>(firstTopDocsVal); + long totalHitsCount = firstTopDocs.getPlain().totalHits.value; - // Run the searches for each page until the end - boolean finished = currentAllowedResults.var <= 0; - while (!finished) { - lastTopDocs = indexSearcher.searchAfter(lastScoreDoc, query, MAX_ITEMS_PER_PAGE, luceneSort, scoreMode != ScoreMode.COMPLETE_NO_SCORES); + return new LuceneSearchInstance() { + @Override + public long getTotalHitsCount() { + return totalHitsCount; + } + + @Override + public void getResults(ResultItemConsumer resultsConsumer) throws IOException { + TopDocs lastTopDocs = firstTopDocs.getAndSet(null); if (lastTopDocs.scoreDocs.length > 0) { - lastScoreDoc = getLastItem(lastTopDocs.scoreDocs); - consumeHits(currentAllowedResults, lastTopDocs.scoreDocs, indexSearcher, minCompetitiveScore, keyFieldName, resultsConsumer); - } - if (lastTopDocs.scoreDocs.length < MAX_ITEMS_PER_PAGE || currentAllowedResults.var <= 0) { - finished = true; + ScoreDoc lastScoreDoc = getLastItem(lastTopDocs.scoreDocs); + consumeHits(currentAllowedResults, + lastTopDocs.scoreDocs, + indexSearcher, + minCompetitiveScore, + keyFieldName, + resultsConsumer + ); + + // Run the searches for each page until the end + boolean finished = currentAllowedResults.var <= 0; + while (!finished) { + boolean halted; + lastTopDocs = indexSearcher.searchAfter(lastScoreDoc, + query, + MAX_ITEMS_PER_PAGE, + luceneSort, + scoreMode != ScoreMode.COMPLETE_NO_SCORES + ); + if (lastTopDocs.scoreDocs.length > 0) { + lastScoreDoc = getLastItem(lastTopDocs.scoreDocs); + halted = consumeHits(currentAllowedResults, + lastTopDocs.scoreDocs, + indexSearcher, + minCompetitiveScore, + keyFieldName, + resultsConsumer + ) == HandleResult.HALT; + } else { + halted = false; + } + if (lastTopDocs.scoreDocs.length < MAX_ITEMS_PER_PAGE || currentAllowedResults.var <= 0 || halted) { + finished = true; + } + } } } - } + }; } private HandleResult consumeHits(IntWrapper currentAllowedResults, @@ -103,4 +150,5 @@ public class PagedStreamSearcher implements LuceneStreamSearcher { private static ScoreDoc getLastItem(ScoreDoc[] scoreDocs) { return scoreDocs[scoreDocs.length - 1]; } + } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ParallelCollectorStreamSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ParallelCollectorStreamSearcher.java index 1bdc5bc..0c31fc1 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/ParallelCollectorStreamSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ParallelCollectorStreamSearcher.java @@ -6,7 +6,6 @@ import it.cavallium.dbengine.lucene.LuceneParallelStreamCollectorResult; import java.io.IOException; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.LongConsumer; import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexableField; import org.apache.lucene.search.IndexSearcher; @@ -20,50 +19,71 @@ import org.jetbrains.annotations.Nullable; */ public class ParallelCollectorStreamSearcher implements LuceneStreamSearcher { + private final CountStreamSearcher countStreamSearcher; + + public ParallelCollectorStreamSearcher(CountStreamSearcher countStreamSearcher) { + this.countStreamSearcher = countStreamSearcher; + } + @Override - public void search(IndexSearcher indexSearcher, + public LuceneSearchInstance search(IndexSearcher indexSearcher, Query query, + int offset, int limit, @Nullable Sort luceneSort, ScoreMode scoreMode, @Nullable Float minCompetitiveScore, - String keyFieldName, - ResultItemConsumer resultsConsumer, - LongConsumer totalHitsConsumer) throws IOException { + String keyFieldName) throws IOException { + if (offset != 0) { + throw new IllegalArgumentException("ParallelCollectorStreamSearcher doesn't support a offset different than 0"); + } if (luceneSort != null) { throw new IllegalArgumentException("ParallelCollectorStreamSearcher doesn't support sorted searches"); } - AtomicInteger currentCount = new AtomicInteger(); + return new LuceneSearchInstance() { - LuceneParallelStreamCollectorResult result = indexSearcher.search(query, LuceneParallelStreamCollectorManager.fromConsumer(scoreMode, minCompetitiveScore, (docId, score) -> { - if (currentCount.getAndIncrement() >= limit) { - return HandleResult.HALT; - } else { - Document d = indexSearcher.doc(docId, Set.of(keyFieldName)); - if (d.getFields().isEmpty()) { - logger.error("The document docId: {} is empty.", docId); - var realFields = indexSearcher.doc(docId).getFields(); - if (!realFields.isEmpty()) { - logger.error("Present fields:"); - for (IndexableField field : realFields) { - logger.error(" - {}", field.name()); - } - } - } else { - var field = d.getField(keyFieldName); - if (field == null) { - logger.error("Can't get key of document docId: {}", docId); - } else { - if (resultsConsumer.accept(new LLKeyScore(field.stringValue(), score)) == HandleResult.HALT) { - return HandleResult.HALT; - } - } - } - return HandleResult.CONTINUE; + long totalHitsCount = countStreamSearcher.countLong(indexSearcher, query); + + @Override + public long getTotalHitsCount() throws IOException { + return totalHitsCount; } - })); - //todo: check the accuracy of our hits counter! - totalHitsConsumer.accept(result.getTotalHitsCount()); + + @Override + public void getResults(ResultItemConsumer resultsConsumer) throws IOException { + AtomicInteger currentCount = new AtomicInteger(); + + LuceneParallelStreamCollectorResult result = indexSearcher.search(query, + LuceneParallelStreamCollectorManager.fromConsumer(scoreMode, minCompetitiveScore, (docId, score) -> { + if (currentCount.getAndIncrement() >= limit) { + return HandleResult.HALT; + } else { + Document d = indexSearcher.doc(docId, Set.of(keyFieldName)); + if (d.getFields().isEmpty()) { + logger.error("The document docId: {} is empty.", docId); + var realFields = indexSearcher.doc(docId).getFields(); + if (!realFields.isEmpty()) { + logger.error("Present fields:"); + for (IndexableField field : realFields) { + logger.error(" - {}", field.name()); + } + } + } else { + var field = d.getField(keyFieldName); + if (field == null) { + logger.error("Can't get key of document docId: {}", docId); + } else { + if (resultsConsumer.accept(new LLKeyScore(field.stringValue(), score)) == HandleResult.HALT) { + return HandleResult.HALT; + } + } + } + return HandleResult.CONTINUE; + } + })); + this.totalHitsCount = result.getTotalHitsCount(); + } + }; } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleStreamSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleStreamSearcher.java index 4886202..9e1ac1b 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleStreamSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleStreamSearcher.java @@ -3,13 +3,11 @@ package it.cavallium.dbengine.lucene.searcher; import it.cavallium.dbengine.lucene.LuceneUtils; import it.unimi.dsi.fastutil.objects.ObjectArrayList; import java.io.IOException; -import java.util.function.LongConsumer; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Sort; -import org.apache.lucene.search.TopDocs; import org.jetbrains.annotations.Nullable; /** @@ -18,36 +16,46 @@ import org.jetbrains.annotations.Nullable; public class SimpleStreamSearcher implements LuceneStreamSearcher { @Override - public void search(IndexSearcher indexSearcher, + public LuceneSearchInstance search(IndexSearcher indexSearcher, Query query, + int offset, int limit, @Nullable Sort luceneSort, ScoreMode scoreMode, @Nullable Float minCompetitiveScore, - String keyFieldName, - ResultItemConsumer resultsConsumer, - LongConsumer totalHitsConsumer) throws IOException { - TopDocs topDocs; - if (luceneSort != null) { - topDocs = indexSearcher.search(query, limit, luceneSort, scoreMode != ScoreMode.COMPLETE_NO_SCORES); - } else { - topDocs = indexSearcher.search(query, limit); - } - totalHitsConsumer.accept(topDocs.totalHits.value); - var hits = ObjectArrayList.wrap(topDocs.scoreDocs); - for (ScoreDoc hit : hits) { - int docId = hit.doc; - float score = hit.score; - if (LuceneUtils.collectTopDoc(logger, - docId, - score, - minCompetitiveScore, - indexSearcher, - keyFieldName, - resultsConsumer - ) == HandleResult.HALT) { - return; + String keyFieldName) throws IOException { + var searcher = new TopDocsSearcher(indexSearcher, + query, + luceneSort, + offset + limit, + null, + scoreMode != ScoreMode.COMPLETE_NO_SCORES, + 1000 + ); + return new LuceneSearchInstance() { + @Override + public long getTotalHitsCount() throws IOException { + return searcher.getTopDocs(0, 1).totalHits.value; } - } + + @Override + public void getResults(ResultItemConsumer resultsConsumer) throws IOException { + ObjectArrayList hits = ObjectArrayList.wrap(searcher.getTopDocs(offset, limit).scoreDocs); + for (ScoreDoc hit : hits) { + int docId = hit.doc; + float score = hit.score; + if (LuceneUtils.collectTopDoc(logger, + docId, + score, + minCompetitiveScore, + indexSearcher, + keyFieldName, + resultsConsumer + ) == HandleResult.HALT) { + return; + } + } + } + }; } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/TopDocsSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/TopDocsSearcher.java new file mode 100644 index 0000000..2627f21 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/TopDocsSearcher.java @@ -0,0 +1,45 @@ +package it.cavallium.dbengine.lucene.searcher; + +import java.io.IOException; +import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TopDocsCollector; +import org.apache.lucene.search.TopFieldCollector; +import org.apache.lucene.search.TopScoreDocCollector; + +class TopDocsSearcher { + + private final TopDocsCollector collector; + private final boolean doDocScores; + private final IndexSearcher indexSearcher; + private final Query query; + + public TopDocsSearcher(IndexSearcher indexSearcher, + Query query, + Sort luceneSort, + int limit, + FieldDoc after, + boolean doDocScores, + int totalHitsThreshold) throws IOException { + if (luceneSort == null) { + this.collector = TopScoreDocCollector.create(limit, after, totalHitsThreshold); + } else { + this.collector = TopFieldCollector.create(luceneSort, limit, after, totalHitsThreshold); + } + this.indexSearcher = indexSearcher; + this.query = query; + this.doDocScores = doDocScores; + indexSearcher.search(query, collector); + } + + public TopDocs getTopDocs(int offset, int length) throws IOException { + TopDocs topDocs = collector.topDocs(offset, length); + if (doDocScores) { + TopFieldCollector.populateScores(topDocs.scoreDocs, indexSearcher, query); + } + return topDocs; + } +}