Implement offsets in lucene search

This commit is contained in:
Andrea Cavalli 2021-04-01 19:48:25 +02:00
parent 3e6573d955
commit 918ff71091
16 changed files with 394 additions and 200 deletions

View File

@ -175,6 +175,7 @@ versions:
QueryParams:
data:
query: Query
offset: long
limit: long
minCompetitiveScore: -float
sort: Sort

View File

@ -2,6 +2,7 @@ package it.cavallium.dbengine.client;
import it.cavallium.dbengine.client.query.ClientQueryParams;
import it.cavallium.dbengine.client.query.current.data.Query;
import it.cavallium.dbengine.client.query.current.data.QueryParams;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLScoreMode;
import it.cavallium.dbengine.database.LLSearchResult;
@ -78,9 +79,26 @@ public class LuceneIndex<T, U> implements LLSnapshottable {
return luceneIndex.deleteAll();
}
private static QueryParams fixOffset(LLLuceneIndex luceneIndex, QueryParams queryParams) {
if (luceneIndex.supportsOffset()) {
return queryParams;
} else {
return queryParams.setOffset(0);
}
}
private static long fixTransformOffset(LLLuceneIndex luceneIndex, long offset) {
if (luceneIndex.supportsOffset()) {
return 0;
} else {
return offset;
}
}
private Mono<SearchResultKeys<T>> transformLuceneResult(LLSearchResult llSearchResult,
@Nullable MultiSort<SearchResultKey<T>> sort,
LLScoreMode scoreMode,
long offset,
@Nullable Long limit) {
Flux<SearchResultKeys<T>> mappedKeys = llSearchResult
.getResults()
@ -104,12 +122,13 @@ public class LuceneIndex<T, U> implements LLSnapshottable {
} else {
mappedSort = null;
}
return LuceneUtils.mergeSignalStreamKeys(mappedKeys, mappedSort, limit);
return LuceneUtils.mergeSignalStreamKeys(mappedKeys, mappedSort, offset, limit);
}
private Mono<SearchResult<T, U>> transformLuceneResultWithValues(LLSearchResult llSearchResult,
@Nullable MultiSort<SearchResultItem<T, U>> sort,
LLScoreMode scoreMode,
long offset,
@Nullable Long limit,
ValueGetter<T, U> valueGetter) {
Flux<SearchResult<T, U>> mappedKeys = llSearchResult
@ -135,7 +154,7 @@ public class LuceneIndex<T, U> implements LLSnapshottable {
} else {
mappedSort = null;
}
return LuceneUtils.mergeSignalStreamItems(mappedKeys, mappedSort, limit);
return LuceneUtils.mergeSignalStreamItems(mappedKeys, mappedSort, offset, limit);
}
/**
@ -152,15 +171,17 @@ public class LuceneIndex<T, U> implements LLSnapshottable {
Flux<Tuple2<String, Set<String>>> mltDocumentFields
= indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue);
return luceneIndex
.moreLikeThis(resolveSnapshot(queryParams.getSnapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName(), mltDocumentFields)
.moreLikeThis(resolveSnapshot(queryParams.getSnapshot()), fixOffset(luceneIndex, queryParams.toQueryParams()), indicizer.getKeyFieldName(), mltDocumentFields)
.flatMap(llSearchResult -> this.transformLuceneResult(llSearchResult,
queryParams.getSort(),
queryParams.getScoreMode(),
fixTransformOffset(luceneIndex, queryParams.getOffset()),
queryParams.getLimit()
));
}
/**
*
* @param queryParams the limit is valid for each lucene instance.
@ -177,13 +198,14 @@ public class LuceneIndex<T, U> implements LLSnapshottable {
= indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue);
return luceneIndex
.moreLikeThis(resolveSnapshot(queryParams.getSnapshot()),
queryParams.toQueryParams(),
fixOffset(luceneIndex, queryParams.toQueryParams()),
indicizer.getKeyFieldName(),
mltDocumentFields
)
.flatMap(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult,
queryParams.getSort(),
queryParams.getScoreMode(),
fixTransformOffset(luceneIndex, queryParams.getOffset()),
queryParams.getLimit(),
valueGetter
));
@ -199,10 +221,14 @@ public class LuceneIndex<T, U> implements LLSnapshottable {
public Mono<SearchResultKeys<T>> search(
ClientQueryParams<SearchResultKey<T>> queryParams) {
return luceneIndex
.search(resolveSnapshot(queryParams.getSnapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName())
.search(resolveSnapshot(queryParams.getSnapshot()),
fixOffset(luceneIndex, queryParams.toQueryParams()),
indicizer.getKeyFieldName()
)
.flatMap(llSearchResult -> this.transformLuceneResult(llSearchResult,
queryParams.getSort(),
queryParams.getScoreMode(),
fixTransformOffset(luceneIndex, queryParams.getOffset()),
queryParams.getLimit()
));
}
@ -218,10 +244,11 @@ public class LuceneIndex<T, U> implements LLSnapshottable {
ClientQueryParams<SearchResultItem<T, U>> queryParams,
ValueGetter<T, U> valueGetter) {
return luceneIndex
.search(resolveSnapshot(queryParams.getSnapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName())
.search(resolveSnapshot(queryParams.getSnapshot()), fixOffset(luceneIndex, queryParams.toQueryParams()), indicizer.getKeyFieldName())
.flatMap(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult,
queryParams.getSort(),
queryParams.getScoreMode(),
fixTransformOffset(luceneIndex, queryParams.getOffset()),
queryParams.getLimit(),
valueGetter
));

View File

@ -35,6 +35,9 @@ public final class ClientQueryParams<T> {
@NonNull
private Query query;
@Default
private long offset = 0;
@Default
private long limit = Long.MAX_VALUE;
@ -75,6 +78,7 @@ public final class ClientQueryParams<T> {
.query(getQuery())
.sort(getSort() != null ? getSort().getQuerySort() : NoSort.of())
.minCompetitiveScore(Nullablefloat.ofNullable(getMinCompetitiveScore()))
.offset(getOffset())
.limit(getLimit())
.scoreMode(toScoreMode())
.build();

View File

@ -49,15 +49,17 @@ public interface LLLuceneIndex extends LLSnapshottable {
Mono<LLSearchResult> search(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName);
default Mono<Long> count(@Nullable LLSnapshot snapshot, Query query) {
QueryParams params = QueryParams.of(query, 0, Nullablefloat.empty(), NoSort.of(), ScoreMode.of(false, false));
QueryParams params = QueryParams.of(query, 0, 0, Nullablefloat.empty(), NoSort.of(), ScoreMode.of(false, false));
return Mono.from(this.search(snapshot, params, null)
.flatMap(results -> LuceneUtils.mergeSignalStreamRaw(results.getResults(), null, null))
.flatMap(results -> LuceneUtils.mergeSignalStreamRaw(results.getResults(), null, 0, null))
.map(LLSearchResultShard::getTotalHitsCount)
.defaultIfEmpty(0L));
}
boolean isLowMemoryMode();
boolean supportsOffset();
Mono<Void> close();
/**

View File

@ -19,6 +19,7 @@ import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
import it.cavallium.dbengine.lucene.searcher.AdaptiveStreamSearcher;
import it.cavallium.dbengine.lucene.searcher.AllowOnlyQueryParsingCollectorStreamSearcher;
import it.cavallium.dbengine.lucene.searcher.LuceneSearchInstance;
import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher;
import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher.HandleResult;
import java.io.IOException;
@ -451,6 +452,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.subscribeOn(luceneQueryScheduler)
.map(luceneQuery -> luceneSearch(doDistributedPre,
indexSearcher,
queryParams.getOffset(),
queryParams.getLimit(),
queryParams.getMinCompetitiveScore().getNullable(),
keyFieldName,
@ -510,6 +512,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
return luceneSearch(doDistributedPre,
indexSearcher,
queryParams.getOffset(),
queryParams.getLimit(),
queryParams.getMinCompetitiveScore().getNullable(),
keyFieldName,
@ -534,6 +537,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private LLSearchResult luceneSearch(boolean doDistributedPre,
IndexSearcher indexSearcher,
long offset,
long limit,
@Nullable Float minCompetitiveScore,
String keyFieldName,
@ -543,26 +547,29 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
ScoreMode luceneScoreMode) {
return new LLSearchResult(Mono.<LLSearchResultShard>create(monoSink -> {
LuceneSearchInstance luceneSearchInstance;
long totalHitsCount;
try {
if (!doDistributedPre) {
AtomicLong totalHitsCountAtomic = new AtomicLong(0);
if (doDistributedPre) {
//noinspection BlockingMethodInNonBlockingContext
streamSearcher.search(indexSearcher,
allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery);
monoSink.success(new LLSearchResultShard(Flux.empty(), 0));
return;
} else {
int boundedOffset = Math.max(0, offset > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) offset);
int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit);
//noinspection BlockingMethodInNonBlockingContext
luceneSearchInstance = streamSearcher.search(indexSearcher,
luceneQuery,
0,
boundedOffset,
boundedLimit,
luceneSort,
luceneScoreMode,
minCompetitiveScore,
keyFieldName,
keyScore -> HandleResult.HALT,
totalHitsCountAtomic::set
keyFieldName
);
totalHitsCount = totalHitsCountAtomic.get();
} else {
//noinspection BlockingMethodInNonBlockingContext
allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery);
totalHitsCount = 0;
totalHitsCount = luceneSearchInstance.getTotalHitsCount();
}
} catch (Exception ex) {
monoSink.error(ex);
@ -590,43 +597,30 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
});
try {
if (!doDistributedPre) {
int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit);
AtomicLong atomicTotalHitsCount = new AtomicLong(0);
//noinspection BlockingMethodInNonBlockingContext
streamSearcher.search(indexSearcher,
luceneQuery,
boundedLimit,
luceneSort,
luceneScoreMode,
minCompetitiveScore,
keyFieldName,
keyScore -> {
try {
if (cancelled.get()) {
return HandleResult.HALT;
}
while (!requests.tryAcquire(500, TimeUnit.MILLISECONDS)) {
if (cancelled.get()) {
return HandleResult.HALT;
}
}
sink.next(fixKeyScore(keyScore, scoreDivisor));
if (cancelled.get()) {
return HandleResult.HALT;
} else {
return HandleResult.CONTINUE;
}
} catch (Exception ex) {
sink.error(ex);
cancelled.set(true);
requests.release(Integer.MAX_VALUE);
return HandleResult.HALT;
}
},
atomicTotalHitsCount::set
);
}
//noinspection BlockingMethodInNonBlockingContext
luceneSearchInstance.getResults(keyScore -> {
try {
if (cancelled.get()) {
return HandleResult.HALT;
}
while (!requests.tryAcquire(500, TimeUnit.MILLISECONDS)) {
if (cancelled.get()) {
return HandleResult.HALT;
}
}
sink.next(fixKeyScore(keyScore, scoreDivisor));
if (cancelled.get()) {
return HandleResult.HALT;
} else {
return HandleResult.CONTINUE;
}
} catch (Exception ex) {
sink.error(ex);
cancelled.set(true);
requests.release(Integer.MAX_VALUE);
return HandleResult.HALT;
}
});
sink.complete();
} catch (Exception ex) {
sink.error(ex);
@ -720,4 +714,9 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
public boolean isLowMemoryMode() {
return lowMemory;
}
@Override
public boolean supportsOffset() {
return true;
}
}

View File

@ -212,6 +212,9 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
QueryParams queryParams,
String keyFieldName,
Flux<Tuple2<String, Set<String>>> mltDocumentFields) {
if (queryParams.getOffset() != 0) {
return Mono.error(new IllegalArgumentException("MultiLuceneIndex requires an offset equal to 0"));
}
long actionId;
int scoreDivisor;
Flux<Tuple2<String, Set<String>>> mltDocumentFieldsShared;
@ -284,6 +287,9 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
long actionId;
int scoreDivisor;
Mono<Void> distributedPre;
if (queryParams.getOffset() != 0) {
return Mono.error(new IllegalArgumentException("MultiLuceneIndex requires an offset equal to 0"));
}
if (luceneIndices.length <= 1 || !queryParams.getScoreMode().getComputeScores()) {
actionId = -1;
scoreDivisor = 1;
@ -410,4 +416,9 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
public boolean isLowMemoryMode() {
return luceneIndices[0].isLowMemoryMode();
}
@Override
public boolean supportsOffset() {
return false;
}
}

View File

@ -181,6 +181,7 @@ public class LuceneUtils {
*/
public static <T> Flux<T> mergeStream(Flux<Flux<T>> mappedMultiResults,
@Nullable MultiSort<T> sort,
long offset,
@Nullable Long limit) {
if (limit != null && limit == 0) {
return mappedMultiResults.flatMap(f -> f).ignoreElements().flux();
@ -193,10 +194,16 @@ public class LuceneUtils {
//noinspection unchecked
mergedFlux = Flux.mergeOrdered(32, sort.getResultSort(), mappedMultiResultsList.toArray(Flux[]::new));
}
if (limit == null || limit == Long.MAX_VALUE) {
return mergedFlux;
Flux<T> offsetedFlux;
if (offset > 0) {
offsetedFlux = mergedFlux.skip(offset);
} else {
return mergedFlux.limitRequest(limit);
offsetedFlux = mergedFlux;
}
if (limit == null || limit == Long.MAX_VALUE) {
return offsetedFlux;
} else {
return offsetedFlux.limitRequest(limit);
}
});
}
@ -236,31 +243,34 @@ public class LuceneUtils {
public static <T> Mono<SearchResultKeys<T>> mergeSignalStreamKeys(Flux<SearchResultKeys<T>> mappedKeys,
MultiSort<SearchResultKey<T>> sort,
long offset,
Long limit) {
return mappedKeys.reduce(
new SearchResultKeys<>(Flux.empty(), 0L),
(a, b) -> new SearchResultKeys<T>(LuceneUtils
.mergeStream(Flux.just(a.getResults(), b.getResults()), sort, limit), a.getTotalHitsCount() + b.getTotalHitsCount())
.mergeStream(Flux.just(a.getResults(), b.getResults()), sort, offset, limit), a.getTotalHitsCount() + b.getTotalHitsCount())
);
}
public static <T, U> Mono<SearchResult<T, U>> mergeSignalStreamItems(Flux<SearchResult<T, U>> mappedKeys,
MultiSort<SearchResultItem<T, U>> sort,
long offset,
Long limit) {
return mappedKeys.reduce(
new SearchResult<>(Flux.empty(), 0L),
(a, b) -> new SearchResult<T, U>(LuceneUtils
.mergeStream(Flux.just(a.getResults(), b.getResults()), sort, limit), a.getTotalHitsCount() + b.getTotalHitsCount())
.mergeStream(Flux.just(a.getResults(), b.getResults()), sort, offset, limit), a.getTotalHitsCount() + b.getTotalHitsCount())
);
}
public static Mono<LLSearchResultShard> mergeSignalStreamRaw(Flux<LLSearchResultShard> mappedKeys,
MultiSort<LLKeyScore> mappedSort,
long offset,
Long limit) {
return mappedKeys.reduce(
new LLSearchResultShard(Flux.empty(), 0),
(s1, s2) -> new LLSearchResultShard(
LuceneUtils.mergeStream(Flux.just(s1.getResults(), s2.getResults()), mappedSort, limit),
LuceneUtils.mergeStream(Flux.just(s1.getResults(), s2.getResults()), mappedSort, offset, limit),
s1.getTotalHitsCount() + s2.getTotalHitsCount()
)
);

View File

@ -1,7 +1,6 @@
package it.cavallium.dbengine.lucene.searcher;
import java.io.IOException;
import java.util.function.LongConsumer;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
@ -21,56 +20,52 @@ public class AdaptiveStreamSearcher implements LuceneStreamSearcher {
public AdaptiveStreamSearcher() {
this.simpleStreamSearcher = new SimpleStreamSearcher();
this.parallelCollectorStreamSearcher = new ParallelCollectorStreamSearcher();
this.pagedStreamSearcher = new PagedStreamSearcher(simpleStreamSearcher);
this.countStreamSearcher = new CountStreamSearcher();
this.parallelCollectorStreamSearcher = new ParallelCollectorStreamSearcher(countStreamSearcher);
this.pagedStreamSearcher = new PagedStreamSearcher(simpleStreamSearcher);
}
@Override
public void search(IndexSearcher indexSearcher,
public LuceneSearchInstance search(IndexSearcher indexSearcher,
Query query,
int offset,
int limit,
@Nullable Sort luceneSort,
ScoreMode scoreMode,
@Nullable Float minCompetitiveScore,
String keyFieldName,
ResultItemConsumer consumer,
LongConsumer totalHitsConsumer) throws IOException {
String keyFieldName) throws IOException {
if (limit == 0) {
totalHitsConsumer.accept(countStreamSearcher.count(indexSearcher, query));
} else if (luceneSort == null && ENABLE_PARALLEL_COLLECTOR) {
parallelCollectorStreamSearcher.search(indexSearcher,
return countStreamSearcher.count(indexSearcher, query);
} else if (offset == 0 && luceneSort == null && ENABLE_PARALLEL_COLLECTOR) {
return parallelCollectorStreamSearcher.search(indexSearcher,
query,
offset,
limit,
null,
scoreMode,
minCompetitiveScore,
keyFieldName,
consumer,
totalHitsConsumer
keyFieldName
);
} else {
if (luceneSort != null && limit > PagedStreamSearcher.MAX_ITEMS_PER_PAGE) {
pagedStreamSearcher.search(indexSearcher,
if (offset > 0 || limit > PagedStreamSearcher.MAX_ITEMS_PER_PAGE) {
return pagedStreamSearcher.search(indexSearcher,
query,
offset,
limit,
luceneSort,
scoreMode,
minCompetitiveScore,
keyFieldName,
consumer,
totalHitsConsumer
keyFieldName
);
} else {
simpleStreamSearcher.search(indexSearcher,
return simpleStreamSearcher.search(indexSearcher,
query,
offset,
limit,
luceneSort,
scoreMode,
minCompetitiveScore,
keyFieldName,
consumer,
totalHitsConsumer
keyFieldName
);
}
}

View File

@ -2,7 +2,6 @@ package it.cavallium.dbengine.lucene.searcher;
import java.io.IOException;
import java.util.Collection;
import java.util.function.LongConsumer;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
@ -21,19 +20,18 @@ public class AllowOnlyQueryParsingCollectorStreamSearcher implements LuceneStrea
public void search(IndexSearcher indexSearcher,
Query query) throws IOException {
search(indexSearcher, query, 0, null, null, null, null, null, null);
search(indexSearcher, query, 0, 0, null, null, null, null);
}
@Override
public void search(IndexSearcher indexSearcher,
public LuceneSearchInstance search(IndexSearcher indexSearcher,
Query query,
int offset,
int limit,
@Nullable Sort luceneSort,
ScoreMode scoreMode,
@Nullable Float minCompetitiveScore,
String keyFieldName,
ResultItemConsumer resultsConsumer,
LongConsumer totalHitsConsumer) throws IOException {
String keyFieldName) throws IOException {
if (limit > 0) {
throw new IllegalArgumentException("Limit > 0 not allowed");
}
@ -49,12 +47,6 @@ public class AllowOnlyQueryParsingCollectorStreamSearcher implements LuceneStrea
if (keyFieldName != null) {
throw new IllegalArgumentException("Key field name not allowed");
}
if (resultsConsumer != null) {
throw new IllegalArgumentException("Results consumer not allowed");
}
if (totalHitsConsumer != null) {
throw new IllegalArgumentException("Total hits consumer not allowed");
}
indexSearcher.search(query, new CollectorManager<>() {
@Override
public Collector newCollector() {
@ -85,5 +77,17 @@ public class AllowOnlyQueryParsingCollectorStreamSearcher implements LuceneStrea
return null;
}
});
return new LuceneSearchInstance() {
@Override
public long getTotalHitsCount() throws IOException {
throw new IllegalArgumentException("Total hits consumer not allowed");
}
@Override
public void getResults(ResultItemConsumer consumer) throws IOException {
throw new IllegalArgumentException("Results consumer not allowed");
}
};
}
}

View File

@ -1,7 +1,6 @@
package it.cavallium.dbengine.lucene.searcher;
import java.io.IOException;
import java.util.function.LongConsumer;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
@ -14,31 +13,42 @@ import org.jetbrains.annotations.Nullable;
public class CountStreamSearcher implements LuceneStreamSearcher {
@Override
public void search(IndexSearcher indexSearcher,
public LuceneSearchInstance search(IndexSearcher indexSearcher,
Query query,
int offset,
int limit,
@Nullable Sort luceneSort,
ScoreMode scoreMode,
@Nullable Float minCompetitiveScore,
String keyFieldName,
ResultItemConsumer resultsConsumer,
LongConsumer totalHitsConsumer) throws IOException {
String keyFieldName) throws IOException {
if (limit != 0) {
throw new IllegalArgumentException("CountStream doesn't support a limit different than 0");
}
if (luceneSort != null) {
throw new IllegalArgumentException("CountStream doesn't support sorting");
}
if (resultsConsumer != null) {
throw new IllegalArgumentException("CountStream doesn't support a results consumer");
}
if (keyFieldName != null) {
throw new IllegalArgumentException("CountStream doesn't support a key field");
}
totalHitsConsumer.accept(count(indexSearcher, query));
return count(indexSearcher, query);
}
public long count(IndexSearcher indexSearcher, Query query) throws IOException {
public long countLong(IndexSearcher indexSearcher, Query query) throws IOException {
return indexSearcher.count(query);
}
public LuceneSearchInstance count(IndexSearcher indexSearcher, Query query) throws IOException {
long totalHitsCount = countLong(indexSearcher, query);
return new LuceneSearchInstance() {
@Override
public long getTotalHitsCount() {
return totalHitsCount;
}
@Override
public void getResults(ResultItemConsumer consumer) {
}
};
}
}

View File

@ -0,0 +1,11 @@
package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher.ResultItemConsumer;
import java.io.IOException;
public interface LuceneSearchInstance {
long getTotalHitsCount() throws IOException;
void getResults(ResultItemConsumer consumer) throws IOException;
}

View File

@ -2,7 +2,6 @@ package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.database.LLKeyScore;
import java.io.IOException;
import java.util.function.LongConsumer;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
@ -19,6 +18,7 @@ public interface LuceneStreamSearcher {
* Do a lucene query, receiving the single results using a consumer
* @param indexSearcher the index searcher, which contains all the lucene data
* @param query the query
* @param offset the offset of the first result (use 0 to disable offset)
* @param limit the maximum number of results
* @param luceneSort the sorting method used for the search
* @param scoreMode score mode
@ -28,15 +28,14 @@ public interface LuceneStreamSearcher {
* @param totalHitsConsumer the consumer of total count of results
* @throws IOException thrown if there is an error
*/
void search(IndexSearcher indexSearcher,
LuceneSearchInstance search(IndexSearcher indexSearcher,
Query query,
int offset,
int limit,
@Nullable Sort luceneSort,
ScoreMode scoreMode,
@Nullable Float minCompetitiveScore,
String keyFieldName,
ResultItemConsumer resultsConsumer,
LongConsumer totalHitsConsumer) throws IOException;
String keyFieldName) throws IOException;
@FunctionalInterface
interface ResultItemConsumer {

View File

@ -2,7 +2,7 @@ package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.lucene.LuceneUtils;
import java.io.IOException;
import java.util.function.LongConsumer;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
@ -18,58 +18,105 @@ import org.warp.commonutils.type.IntWrapper;
public class PagedStreamSearcher implements LuceneStreamSearcher {
public static final int MAX_ITEMS_PER_PAGE = 1000;
private final LuceneStreamSearcher baseStreamSearcher;
private final SimpleStreamSearcher simpleStreamSearcher;
public PagedStreamSearcher(LuceneStreamSearcher baseStreamSearcher) {
this.baseStreamSearcher = baseStreamSearcher;
public PagedStreamSearcher(SimpleStreamSearcher simpleStreamSearcher) {
this.simpleStreamSearcher = simpleStreamSearcher;
}
@Override
public void search(IndexSearcher indexSearcher,
public LuceneSearchInstance search(IndexSearcher indexSearcher,
Query query,
int offset,
int limit,
@Nullable Sort luceneSort,
ScoreMode scoreMode,
@Nullable Float minCompetitiveScore,
String keyFieldName,
ResultItemConsumer resultsConsumer,
LongConsumer totalHitsConsumer) throws IOException {
String keyFieldName) throws IOException {
if (limit < MAX_ITEMS_PER_PAGE) {
// Use a normal search method because the limit is low
baseStreamSearcher.search(indexSearcher,
simpleStreamSearcher.search(indexSearcher,
query,
offset,
limit,
luceneSort,
scoreMode,
minCompetitiveScore,
keyFieldName,
resultsConsumer,
totalHitsConsumer
keyFieldName
);
return;
}
IntWrapper currentAllowedResults = new IntWrapper(limit);
// Run the first page search
TopDocs lastTopDocs = indexSearcher.search(query, MAX_ITEMS_PER_PAGE, luceneSort, scoreMode != ScoreMode.COMPLETE_NO_SCORES);
totalHitsConsumer.accept(lastTopDocs.totalHits.value);
if (lastTopDocs.scoreDocs.length > 0) {
ScoreDoc lastScoreDoc = getLastItem(lastTopDocs.scoreDocs);
consumeHits(currentAllowedResults, lastTopDocs.scoreDocs, indexSearcher, minCompetitiveScore, keyFieldName, resultsConsumer);
TopDocs firstTopDocsVal;
if (offset == 0) {
firstTopDocsVal = indexSearcher.search(query,
MAX_ITEMS_PER_PAGE,
luceneSort,
scoreMode != ScoreMode.COMPLETE_NO_SCORES
);
} else {
firstTopDocsVal = new TopDocsSearcher(indexSearcher,
query,
luceneSort,
MAX_ITEMS_PER_PAGE,
null,
scoreMode != ScoreMode.COMPLETE_NO_SCORES,
1000
).getTopDocs(offset, MAX_ITEMS_PER_PAGE);
}
AtomicReference<TopDocs> firstTopDocs = new AtomicReference<>(firstTopDocsVal);
long totalHitsCount = firstTopDocs.getPlain().totalHits.value;
// Run the searches for each page until the end
boolean finished = currentAllowedResults.var <= 0;
while (!finished) {
lastTopDocs = indexSearcher.searchAfter(lastScoreDoc, query, MAX_ITEMS_PER_PAGE, luceneSort, scoreMode != ScoreMode.COMPLETE_NO_SCORES);
return new LuceneSearchInstance() {
@Override
public long getTotalHitsCount() {
return totalHitsCount;
}
@Override
public void getResults(ResultItemConsumer resultsConsumer) throws IOException {
TopDocs lastTopDocs = firstTopDocs.getAndSet(null);
if (lastTopDocs.scoreDocs.length > 0) {
lastScoreDoc = getLastItem(lastTopDocs.scoreDocs);
consumeHits(currentAllowedResults, lastTopDocs.scoreDocs, indexSearcher, minCompetitiveScore, keyFieldName, resultsConsumer);
}
if (lastTopDocs.scoreDocs.length < MAX_ITEMS_PER_PAGE || currentAllowedResults.var <= 0) {
finished = true;
ScoreDoc lastScoreDoc = getLastItem(lastTopDocs.scoreDocs);
consumeHits(currentAllowedResults,
lastTopDocs.scoreDocs,
indexSearcher,
minCompetitiveScore,
keyFieldName,
resultsConsumer
);
// Run the searches for each page until the end
boolean finished = currentAllowedResults.var <= 0;
while (!finished) {
boolean halted;
lastTopDocs = indexSearcher.searchAfter(lastScoreDoc,
query,
MAX_ITEMS_PER_PAGE,
luceneSort,
scoreMode != ScoreMode.COMPLETE_NO_SCORES
);
if (lastTopDocs.scoreDocs.length > 0) {
lastScoreDoc = getLastItem(lastTopDocs.scoreDocs);
halted = consumeHits(currentAllowedResults,
lastTopDocs.scoreDocs,
indexSearcher,
minCompetitiveScore,
keyFieldName,
resultsConsumer
) == HandleResult.HALT;
} else {
halted = false;
}
if (lastTopDocs.scoreDocs.length < MAX_ITEMS_PER_PAGE || currentAllowedResults.var <= 0 || halted) {
finished = true;
}
}
}
}
}
};
}
private HandleResult consumeHits(IntWrapper currentAllowedResults,
@ -103,4 +150,5 @@ public class PagedStreamSearcher implements LuceneStreamSearcher {
private static ScoreDoc getLastItem(ScoreDoc[] scoreDocs) {
return scoreDocs[scoreDocs.length - 1];
}
}

View File

@ -6,7 +6,6 @@ import it.cavallium.dbengine.lucene.LuceneParallelStreamCollectorResult;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongConsumer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.search.IndexSearcher;
@ -20,50 +19,71 @@ import org.jetbrains.annotations.Nullable;
*/
public class ParallelCollectorStreamSearcher implements LuceneStreamSearcher {
private final CountStreamSearcher countStreamSearcher;
public ParallelCollectorStreamSearcher(CountStreamSearcher countStreamSearcher) {
this.countStreamSearcher = countStreamSearcher;
}
@Override
public void search(IndexSearcher indexSearcher,
public LuceneSearchInstance search(IndexSearcher indexSearcher,
Query query,
int offset,
int limit,
@Nullable Sort luceneSort,
ScoreMode scoreMode,
@Nullable Float minCompetitiveScore,
String keyFieldName,
ResultItemConsumer resultsConsumer,
LongConsumer totalHitsConsumer) throws IOException {
String keyFieldName) throws IOException {
if (offset != 0) {
throw new IllegalArgumentException("ParallelCollectorStreamSearcher doesn't support a offset different than 0");
}
if (luceneSort != null) {
throw new IllegalArgumentException("ParallelCollectorStreamSearcher doesn't support sorted searches");
}
AtomicInteger currentCount = new AtomicInteger();
return new LuceneSearchInstance() {
LuceneParallelStreamCollectorResult result = indexSearcher.search(query, LuceneParallelStreamCollectorManager.fromConsumer(scoreMode, minCompetitiveScore, (docId, score) -> {
if (currentCount.getAndIncrement() >= limit) {
return HandleResult.HALT;
} else {
Document d = indexSearcher.doc(docId, Set.of(keyFieldName));
if (d.getFields().isEmpty()) {
logger.error("The document docId: {} is empty.", docId);
var realFields = indexSearcher.doc(docId).getFields();
if (!realFields.isEmpty()) {
logger.error("Present fields:");
for (IndexableField field : realFields) {
logger.error(" - {}", field.name());
}
}
} else {
var field = d.getField(keyFieldName);
if (field == null) {
logger.error("Can't get key of document docId: {}", docId);
} else {
if (resultsConsumer.accept(new LLKeyScore(field.stringValue(), score)) == HandleResult.HALT) {
return HandleResult.HALT;
}
}
}
return HandleResult.CONTINUE;
long totalHitsCount = countStreamSearcher.countLong(indexSearcher, query);
@Override
public long getTotalHitsCount() throws IOException {
return totalHitsCount;
}
}));
//todo: check the accuracy of our hits counter!
totalHitsConsumer.accept(result.getTotalHitsCount());
@Override
public void getResults(ResultItemConsumer resultsConsumer) throws IOException {
AtomicInteger currentCount = new AtomicInteger();
LuceneParallelStreamCollectorResult result = indexSearcher.search(query,
LuceneParallelStreamCollectorManager.fromConsumer(scoreMode, minCompetitiveScore, (docId, score) -> {
if (currentCount.getAndIncrement() >= limit) {
return HandleResult.HALT;
} else {
Document d = indexSearcher.doc(docId, Set.of(keyFieldName));
if (d.getFields().isEmpty()) {
logger.error("The document docId: {} is empty.", docId);
var realFields = indexSearcher.doc(docId).getFields();
if (!realFields.isEmpty()) {
logger.error("Present fields:");
for (IndexableField field : realFields) {
logger.error(" - {}", field.name());
}
}
} else {
var field = d.getField(keyFieldName);
if (field == null) {
logger.error("Can't get key of document docId: {}", docId);
} else {
if (resultsConsumer.accept(new LLKeyScore(field.stringValue(), score)) == HandleResult.HALT) {
return HandleResult.HALT;
}
}
}
return HandleResult.CONTINUE;
}
}));
this.totalHitsCount = result.getTotalHitsCount();
}
};
}
}

View File

@ -3,13 +3,11 @@ package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.IOException;
import java.util.function.LongConsumer;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.jetbrains.annotations.Nullable;
/**
@ -18,36 +16,46 @@ import org.jetbrains.annotations.Nullable;
public class SimpleStreamSearcher implements LuceneStreamSearcher {
@Override
public void search(IndexSearcher indexSearcher,
public LuceneSearchInstance search(IndexSearcher indexSearcher,
Query query,
int offset,
int limit,
@Nullable Sort luceneSort,
ScoreMode scoreMode,
@Nullable Float minCompetitiveScore,
String keyFieldName,
ResultItemConsumer resultsConsumer,
LongConsumer totalHitsConsumer) throws IOException {
TopDocs topDocs;
if (luceneSort != null) {
topDocs = indexSearcher.search(query, limit, luceneSort, scoreMode != ScoreMode.COMPLETE_NO_SCORES);
} else {
topDocs = indexSearcher.search(query, limit);
}
totalHitsConsumer.accept(topDocs.totalHits.value);
var hits = ObjectArrayList.wrap(topDocs.scoreDocs);
for (ScoreDoc hit : hits) {
int docId = hit.doc;
float score = hit.score;
if (LuceneUtils.collectTopDoc(logger,
docId,
score,
minCompetitiveScore,
indexSearcher,
keyFieldName,
resultsConsumer
) == HandleResult.HALT) {
return;
String keyFieldName) throws IOException {
var searcher = new TopDocsSearcher(indexSearcher,
query,
luceneSort,
offset + limit,
null,
scoreMode != ScoreMode.COMPLETE_NO_SCORES,
1000
);
return new LuceneSearchInstance() {
@Override
public long getTotalHitsCount() throws IOException {
return searcher.getTopDocs(0, 1).totalHits.value;
}
}
@Override
public void getResults(ResultItemConsumer resultsConsumer) throws IOException {
ObjectArrayList<ScoreDoc> hits = ObjectArrayList.wrap(searcher.getTopDocs(offset, limit).scoreDocs);
for (ScoreDoc hit : hits) {
int docId = hit.doc;
float score = hit.score;
if (LuceneUtils.collectTopDoc(logger,
docId,
score,
minCompetitiveScore,
indexSearcher,
keyFieldName,
resultsConsumer
) == HandleResult.HALT) {
return;
}
}
}
};
}
}

View File

@ -0,0 +1,45 @@
package it.cavallium.dbengine.lucene.searcher;
import java.io.IOException;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopDocsCollector;
import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.search.TopScoreDocCollector;
class TopDocsSearcher {
private final TopDocsCollector<?> collector;
private final boolean doDocScores;
private final IndexSearcher indexSearcher;
private final Query query;
public TopDocsSearcher(IndexSearcher indexSearcher,
Query query,
Sort luceneSort,
int limit,
FieldDoc after,
boolean doDocScores,
int totalHitsThreshold) throws IOException {
if (luceneSort == null) {
this.collector = TopScoreDocCollector.create(limit, after, totalHitsThreshold);
} else {
this.collector = TopFieldCollector.create(luceneSort, limit, after, totalHitsThreshold);
}
this.indexSearcher = indexSearcher;
this.query = query;
this.doDocScores = doDocScores;
indexSearcher.search(query, collector);
}
public TopDocs getTopDocs(int offset, int length) throws IOException {
TopDocs topDocs = collector.topDocs(offset, length);
if (doDocScores) {
TopFieldCollector.populateScores(topDocs.scoreDocs, indexSearcher, query);
}
return topDocs;
}
}