Fully reactive lucene queries

This commit is contained in:
Andrea Cavalli 2021-07-04 01:34:17 +02:00
parent 8a1e4028f7
commit 7929f0dc8c
9 changed files with 614 additions and 105 deletions

View File

@ -20,11 +20,12 @@ import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.lucene.AlwaysDirectIOFSDirectory;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.ScheduledTaskLifecycle;
import it.cavallium.dbengine.lucene.searcher.AdaptiveReactiveSearcher;
import it.cavallium.dbengine.lucene.searcher.AdaptiveStreamSearcher;
import it.cavallium.dbengine.lucene.searcher.AllowOnlyQueryParsingCollectorStreamSearcher;
import it.cavallium.dbengine.lucene.searcher.LuceneSearchInstance;
import it.cavallium.dbengine.lucene.searcher.LuceneReactiveSearcher;
import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher;
import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher.HandleResult;
import it.cavallium.dbengine.lucene.searcher.SortedPagedLuceneReactiveSearcher;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
@ -36,9 +37,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.apache.lucene.index.IndexCommit;
@ -71,7 +70,6 @@ import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import org.warp.commonutils.type.ShortNamedThreadFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink.OverflowStrategy;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
@ -82,6 +80,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
protected static final Logger logger = LoggerFactory.getLogger(LLLocalLuceneIndex.class);
private static final LuceneStreamSearcher streamSearcher = new AdaptiveStreamSearcher();
private static final LuceneReactiveSearcher reactiveSearcher = new AdaptiveReactiveSearcher();
private static final AllowOnlyQueryParsingCollectorStreamSearcher allowOnlyQueryParsingCollectorStreamSearcher
= new AllowOnlyQueryParsingCollectorStreamSearcher();
/**
@ -93,12 +92,16 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
"lucene",
Integer.MAX_VALUE,
true
false
);
// Scheduler used to get callback values of LuceneStreamSearcher without creating deadlocks
private static final Scheduler luceneSearcherScheduler = Schedulers
.fromExecutorService(Executors
.newCachedThreadPool(new ShortNamedThreadFactory("lucene-searcher")));
private final Scheduler luceneSearcherScheduler = Schedulers.newBoundedElastic(
4,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
"lucene-searcher",
60,
false
);
private final String luceneIndexName;
private final SnapshotDeletionPolicy snapshotter;
@ -662,88 +665,47 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
Sort luceneSort,
ScoreMode luceneScoreMode,
Mono<Void> successCleanup) {
return new LLSearchResult(Mono.<LLSearchResultShard>create(monoSink -> {
LuceneSearchInstance luceneSearchInstance;
long totalHitsCount;
try {
Flux<LLSearchResultShard> results = Mono
.defer(() -> {
if (doDistributedPre) {
allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery);
monoSink.success(new LLSearchResultShard(successCleanup.thenMany(Flux.empty()), 0));
return;
return Mono.<LLSearchResultShard>create(monoSink -> {
try {
//noinspection BlockingMethodInNonBlockingContext
allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery);
monoSink.success(new LLSearchResultShard(successCleanup.thenMany(Flux.empty()), 0));
} catch (Exception ex) {
monoSink.error(ex);
}
}).subscribeOn(luceneSearcherScheduler);
} else {
int boundedOffset = Math.max(0, offset > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) offset);
int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit);
luceneSearchInstance = streamSearcher.search(indexSearcher,
luceneQuery,
boundedOffset,
boundedLimit,
luceneSort,
luceneScoreMode,
minCompetitiveScore,
keyFieldName
);
totalHitsCount = luceneSearchInstance.getTotalHitsCount();
return reactiveSearcher
.search(indexSearcher,
luceneQuery,
boundedOffset,
boundedLimit,
luceneSort,
luceneScoreMode,
minCompetitiveScore,
keyFieldName,
luceneSearcherScheduler
)
.map(searchInstance -> new LLSearchResultShard(
Flux
.usingWhen(
Mono.just(true),
_unused -> searchInstance
.results()
.map(keyScore -> fixKeyScore(keyScore, scoreDivisor)),
_unused -> successCleanup
),
searchInstance.totalHitsCount()
));
}
} catch (Exception ex) {
monoSink.error(ex);
return;
}
AtomicBoolean alreadySubscribed = new AtomicBoolean(false);
var resultsFlux = Flux.<LLKeyScore>create(sink -> {
if (!alreadySubscribed.compareAndSet(false, true)) {
sink.error(new IllegalStateException("Already subscribed to results"));
return;
}
AtomicBoolean cancelled = new AtomicBoolean();
Semaphore requests = new Semaphore(0);
sink.onDispose(() -> cancelled.set(true));
sink.onCancel(() -> cancelled.set(true));
sink.onRequest(delta -> requests.release((int) Math.min(delta, Integer.MAX_VALUE)));
luceneSearcherScheduler
.schedule(() -> {
try {
luceneSearchInstance.getResults(keyScore -> {
try {
if (cancelled.get()) {
return HandleResult.HALT;
}
while (!requests.tryAcquire(500, TimeUnit.MILLISECONDS)) {
if (cancelled.get()) {
return HandleResult.HALT;
}
}
sink.next(fixKeyScore(keyScore, scoreDivisor));
if (cancelled.get()) {
return HandleResult.HALT;
} else {
return HandleResult.CONTINUE;
}
} catch (Exception ex) {
sink.error(ex);
cancelled.set(true);
requests.release(Integer.MAX_VALUE);
return HandleResult.HALT;
}
});
sink.complete();
} catch (Exception ex) {
sink.error(ex);
}
});
}, OverflowStrategy.ERROR).subscribeOn(Schedulers.boundedElastic());
monoSink.success(new LLSearchResultShard(Flux
.usingWhen(
Mono.just(true),
b -> resultsFlux,
b -> successCleanup),
totalHitsCount));
}).subscribeOn(Schedulers.boundedElastic()).flux());
})
.flux();
return new LLSearchResult(results);
}
@Override

View File

@ -223,6 +223,47 @@ public class LuceneUtils {
return HandleResult.CONTINUE;
}
/**
*
* @return the key score, or null if the result is not relevant
* @throws IOException if an error occurs
*/
@Nullable
public static LLKeyScore collectTopDoc(Logger logger, int docId, float score, Float minCompetitiveScore,
IndexSearcher indexSearcher, String keyFieldName) throws IOException {
if (minCompetitiveScore == null || score >= minCompetitiveScore) {
Document d = indexSearcher.doc(docId, Set.of(keyFieldName));
if (d.getFields().isEmpty()) {
StringBuilder sb = new StringBuilder();
sb.append("The document docId: ").append(docId).append(", score: ").append(score).append(" is empty.");
var realFields = indexSearcher.doc(docId).getFields();
if (!realFields.isEmpty()) {
sb.append("\n");
logger.error("Present fields:\n");
boolean first = true;
for (IndexableField field : realFields) {
if (first) {
first = false;
} else {
sb.append("\n");
}
sb.append(" - ").append(field.name());
}
}
throw new IOException(sb.toString());
} else {
var field = d.getField(keyFieldName);
if (field == null) {
throw new IOException("Can't get key of document docId: " + docId + ", score: " + score);
} else {
return new LLKeyScore(field.stringValue(), score);
}
}
} else {
return null;
}
}
public static <T> Mono<SearchResultKeys<T>> mergeSignalStreamKeys(Flux<SearchResultKeys<T>> mappedKeys,
MultiSort<SearchResultKey<T>> sort,
long offset,

View File

@ -0,0 +1,64 @@
package it.cavallium.dbengine.lucene.searcher;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
public class AdaptiveReactiveSearcher implements LuceneReactiveSearcher {
private static final LuceneReactiveSearcher count = new CountLuceneReactiveSearcher();
private static final LuceneReactiveSearcher sortedPaged = new SortedPagedLuceneReactiveSearcher();
private static final LuceneReactiveSearcher unsortedPaged = new UnsortedPagedLuceneReactiveSearcher();
@Override
public Mono<LuceneReactiveSearchInstance> search(IndexSearcher indexSearcher,
Query query,
int offset,
int limit,
@Nullable Sort luceneSort,
ScoreMode scoreMode,
@Nullable Float minCompetitiveScore,
String keyFieldName,
Scheduler scheduler) {
if (limit == 0) {
return count.search(indexSearcher,
query,
offset,
limit,
luceneSort,
scoreMode,
minCompetitiveScore,
keyFieldName,
scheduler
);
}
if (luceneSort != null) {
return sortedPaged.search(indexSearcher,
query,
offset,
limit,
luceneSort,
scoreMode,
minCompetitiveScore,
keyFieldName,
scheduler
);
} else {
return unsortedPaged.search(indexSearcher,
query,
offset,
limit,
luceneSort,
scoreMode,
minCompetitiveScore,
keyFieldName,
scheduler
);
}
}
}

View File

@ -0,0 +1,37 @@
package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.lucene.LuceneUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
public class CountLuceneReactiveSearcher implements LuceneReactiveSearcher {
@SuppressWarnings("BlockingMethodInNonBlockingContext")
@Override
public Mono<LuceneReactiveSearchInstance> search(IndexSearcher indexSearcher,
Query query,
int offset,
int limit,
@Nullable Sort luceneSort,
ScoreMode scoreMode,
@Nullable Float minCompetitiveScore,
String keyFieldName,
Scheduler scheduler) {
return Mono
.fromCallable(() -> new LuceneReactiveSearchInstance(indexSearcher.count(query), Flux.empty()))
.subscribeOn(scheduler);
}
}

View File

@ -0,0 +1,9 @@
package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher.ResultItemConsumer;
import java.io.IOException;
import reactor.core.publisher.Flux;
public record LuceneReactiveSearchInstance(long totalHitsCount, Flux<LLKeyScore> results) {
}

View File

@ -0,0 +1,39 @@
package it.cavallium.dbengine.lucene.searcher;
import java.io.IOException;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
public interface LuceneReactiveSearcher {
Logger logger = LoggerFactory.getLogger(LuceneReactiveSearcher.class);
/**
* Do a lucene query, receiving the single results using a consumer
* @param indexSearcher the index searcher, which contains all the lucene data
* @param query the query
* @param offset the offset of the first result (use 0 to disable offset)
* @param limit the maximum number of results
* @param luceneSort the sorting method used for the search
* @param scoreMode score mode
* @param minCompetitiveScore minimum score accepted
* @param keyFieldName the name of the key field
* @param scheduler a blocking scheduler
*/
Mono<LuceneReactiveSearchInstance> search(IndexSearcher indexSearcher,
Query query,
int offset,
int limit,
@Nullable Sort luceneSort,
ScoreMode scoreMode,
@Nullable Float minCompetitiveScore,
String keyFieldName,
Scheduler scheduler);
}

View File

@ -0,0 +1,164 @@
package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher.HandleResult;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
public class SortedPagedLuceneReactiveSearcher implements LuceneReactiveSearcher {
private static final int FIRST_PAGE_HITS_MAX_COUNT = 10;
private static final long MIN_HITS_PER_PAGE = 20;
private static final long MAX_HITS_PER_PAGE = 1000;
@SuppressWarnings("BlockingMethodInNonBlockingContext")
@Override
public Mono<LuceneReactiveSearchInstance> search(IndexSearcher indexSearcher,
Query query,
int offset,
int limit,
@Nullable Sort luceneSort,
ScoreMode scoreMode,
@Nullable Float minCompetitiveScore,
String keyFieldName,
Scheduler scheduler) {
if (luceneSort == null) {
return Mono.error(new IllegalArgumentException("Can't execute unsorted queries"));
}
// todo: check if offset and limit play well together.
// check especially these cases:
// - offset > limit
// - offset > FIRST_PAGE_HITS_MAX_COUNT
// - offset > MAX_HITS_PER_PAGE
return Mono
.fromCallable(() -> {
// Run the first page (max 1 item) search
TopDocs firstTopDocsVal;
if (offset == 0) {
firstTopDocsVal = indexSearcher.search(query,
FIRST_PAGE_HITS_MAX_COUNT,
luceneSort,
scoreMode != ScoreMode.COMPLETE_NO_SCORES
);
} else {
firstTopDocsVal = TopDocsSearcher.getTopDocs(indexSearcher,
query,
luceneSort,
FIRST_PAGE_HITS_MAX_COUNT,
null,
scoreMode != ScoreMode.COMPLETE_NO_SCORES,
1000,
offset, FIRST_PAGE_HITS_MAX_COUNT);
}
long totalHitsCount = firstTopDocsVal.totalHits.value;
Mono<List<LLKeyScore>> firstPageHitsMono = Mono
.fromCallable(() -> convertHits(FIRST_PAGE_HITS_MAX_COUNT, firstTopDocsVal.scoreDocs, indexSearcher, minCompetitiveScore, keyFieldName))
.single();
Flux<LLKeyScore> resultsFlux = firstPageHitsMono.flatMapMany(firstPageHits -> {
int firstPageHitsCount = firstPageHits.size();
Flux<LLKeyScore> firstPageHitsFlux = Flux.fromIterable(firstPageHits);
if (firstPageHitsCount < FIRST_PAGE_HITS_MAX_COUNT) {
return Flux.fromIterable(firstPageHits);
} else {
Flux<LLKeyScore> nextPagesFlux = Flux
.<List<LLKeyScore>, PageState>generate(
() -> new PageState(getLastItem(firstTopDocsVal.scoreDocs), 0, limit - firstPageHitsCount),
(s, sink) -> {
if (s.lastItem() == null || s.remainingLimit() <= 0) {
sink.complete();
return new PageState(null, 0,0);
}
try {
var lastTopDocs = indexSearcher.searchAfter(s.lastItem(),
query,
s.hitsPerPage(),
luceneSort,
scoreMode != ScoreMode.COMPLETE_NO_SCORES
);
if (lastTopDocs.scoreDocs.length > 0) {
ScoreDoc lastItem = getLastItem(lastTopDocs.scoreDocs);
var hitsList = convertHits(s.remainingLimit(),
lastTopDocs.scoreDocs,
indexSearcher,
minCompetitiveScore,
keyFieldName
);
sink.next(hitsList);
if (hitsList.size() < s.hitsPerPage()) {
return new PageState(lastItem, 0, 0);
} else {
return new PageState(lastItem, s.currentPageIndex() + 1, s.remainingLimit() - hitsList.size());
}
} else {
sink.complete();
return new PageState(null, 0, 0);
}
} catch (IOException e) {
sink.error(e);
return new PageState(null, 0, 0);
}
}
)
.subscribeOn(scheduler)
.flatMap(Flux::fromIterable);
return Flux.concat(firstPageHitsFlux, nextPagesFlux);
}
});
if (limit == 0) {
return new LuceneReactiveSearchInstance(totalHitsCount, Flux.empty());
} else {
return new LuceneReactiveSearchInstance(totalHitsCount, resultsFlux);
}
})
.subscribeOn(scheduler);
}
private List<LLKeyScore> convertHits(int currentAllowedResults,
ScoreDoc[] hits,
IndexSearcher indexSearcher,
@Nullable Float minCompetitiveScore,
String keyFieldName) throws IOException {
ArrayList<LLKeyScore> collectedResults = new ArrayList<>(hits.length);
for (ScoreDoc hit : hits) {
int docId = hit.doc;
float score = hit.score;
if (currentAllowedResults-- > 0) {
@Nullable LLKeyScore collectedDoc = LuceneUtils.collectTopDoc(logger, docId, score,
minCompetitiveScore, indexSearcher, keyFieldName);
if (collectedDoc != null) {
collectedResults.add(collectedDoc);
}
} else {
break;
}
}
return collectedResults;
}
private static ScoreDoc getLastItem(ScoreDoc[] scoreDocs) {
return scoreDocs[scoreDocs.length - 1];
}
private record PageState(ScoreDoc lastItem, int currentPageIndex, int remainingLimit) {
public int hitsPerPage() {
return (int) Math.min(MAX_HITS_PER_PAGE, MIN_HITS_PER_PAGE * (1L << currentPageIndex));
}
}
}

View File

@ -12,48 +12,87 @@ import org.apache.lucene.search.TopScoreDocCollector;
class TopDocsSearcher {
private final TopDocsCollector<?> collector;
private final boolean doDocScores;
private final IndexSearcher indexSearcher;
private final Query query;
private final Sort luceneSort;
private final int limit;
private final FieldDoc after;
private final int totalHitsThreshold;
@Deprecated
public TopDocsSearcher(IndexSearcher indexSearcher,
Query query,
Sort luceneSort,
int limit,
FieldDoc after,
boolean doDocScores,
int totalHitsThreshold) throws IOException {
if (luceneSort == null) {
this.collector = TopScoreDocCollector.create(limit, after, totalHitsThreshold);
} else {
this.collector = TopFieldCollector.create(luceneSort, limit, after, totalHitsThreshold);
}
int totalHitsThreshold) {
this.indexSearcher = indexSearcher;
this.query = query;
this.luceneSort = luceneSort;
this.limit = limit;
this.after = after;
this.doDocScores = doDocScores;
indexSearcher.search(query, collector);
this.totalHitsThreshold = totalHitsThreshold;
}
/**
* This method must not be called more than once!
*/
@Deprecated
public TopDocs getTopDocs(int offset, int limit) throws IOException {
return getTopDocs(indexSearcher, query, luceneSort, limit, after, doDocScores, totalHitsThreshold, offset, limit);
}
/**
* This method must not be called more than once!
*/
@Deprecated
public TopDocs getTopDocs() throws IOException {
return getTopDocs(indexSearcher, query, luceneSort, limit, after, doDocScores, totalHitsThreshold);
}
public static TopDocs getTopDocs(IndexSearcher indexSearcher,
Query query,
Sort luceneSort,
int limit,
FieldDoc after,
boolean doDocScores,
int totalHitsThreshold,
int topDocsStartOffset,
int topDocsHowMany) throws IOException {
TopDocsCollector<?> collector;
if (luceneSort == null) {
collector = TopScoreDocCollector.create(limit, after, totalHitsThreshold);
} else {
collector = TopFieldCollector.create(luceneSort, limit, after, totalHitsThreshold);
}
TopDocs topDocs = collector.topDocs(topDocsStartOffset, topDocsHowMany);
if (doDocScores) {
TopFieldCollector.populateScores(topDocs.scoreDocs, indexSearcher, query);
}
return topDocs;
}
public static TopDocs getTopDocs(IndexSearcher indexSearcher,
Query query,
Sort luceneSort,
int limit,
FieldDoc after,
boolean doDocScores,
int totalHitsThreshold) throws IOException {
TopDocsCollector<?> collector;
if (luceneSort == null) {
collector = TopScoreDocCollector.create(limit, after, totalHitsThreshold);
} else {
collector = TopFieldCollector.create(luceneSort, limit, after, totalHitsThreshold);
}
TopDocs topDocs = collector.topDocs();
if (doDocScores) {
TopFieldCollector.populateScores(topDocs.scoreDocs, indexSearcher, query);
}
return topDocs;
}
/**
* This method must not be called more than once!
*/
public TopDocs getTopDocs(int offset, int length) throws IOException {
TopDocs topDocs = collector.topDocs(offset, length);
if (doDocScores) {
TopFieldCollector.populateScores(topDocs.scoreDocs, indexSearcher, query);
}
return topDocs;
}
}

View File

@ -0,0 +1,154 @@
package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.lucene.LuceneUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
public class UnsortedPagedLuceneReactiveSearcher implements LuceneReactiveSearcher {
private static final int FIRST_PAGE_HITS_MAX_COUNT = 10;
private static final long MIN_HITS_PER_PAGE = 20;
private static final long MAX_HITS_PER_PAGE = 1000;
@SuppressWarnings("BlockingMethodInNonBlockingContext")
@Override
public Mono<LuceneReactiveSearchInstance> search(IndexSearcher indexSearcher,
Query query,
int offset,
int limit,
@Nullable Sort luceneSort,
ScoreMode scoreMode,
@Nullable Float minCompetitiveScore,
String keyFieldName,
Scheduler scheduler) {
if (luceneSort != null) {
return Mono.error(new IllegalArgumentException("Can't search sorted queries"));
}
// todo: check if offset and limit play well together.
// check especially these cases:
// - offset > limit
// - offset > FIRST_PAGE_HITS_MAX_COUNT
// - offset > MAX_HITS_PER_PAGE
return Mono
.fromCallable(() -> {
// Run the first page (max 1 item) search
TopDocs firstTopDocsVal;
if (offset == 0) {
firstTopDocsVal = indexSearcher.search(query, FIRST_PAGE_HITS_MAX_COUNT);
} else {
firstTopDocsVal = TopDocsSearcher.getTopDocs(indexSearcher,
query,
null,
FIRST_PAGE_HITS_MAX_COUNT,
null,
scoreMode != ScoreMode.COMPLETE_NO_SCORES,
1000,
offset, FIRST_PAGE_HITS_MAX_COUNT);
}
long totalHitsCount = firstTopDocsVal.totalHits.value;
Mono<List<LLKeyScore>> firstPageHitsMono = Mono
.fromCallable(() -> convertHits(FIRST_PAGE_HITS_MAX_COUNT, firstTopDocsVal.scoreDocs, indexSearcher, minCompetitiveScore, keyFieldName))
.single();
Flux<LLKeyScore> resultsFlux = firstPageHitsMono.flatMapMany(firstPageHits -> {
int firstPageHitsCount = firstPageHits.size();
Flux<LLKeyScore> firstPageHitsFlux = Flux.fromIterable(firstPageHits);
if (firstPageHitsCount < FIRST_PAGE_HITS_MAX_COUNT) {
return Flux.fromIterable(firstPageHits);
} else {
Flux<LLKeyScore> nextPagesFlux = Flux
.<List<LLKeyScore>, PageState>generate(
() -> new PageState(getLastItem(firstTopDocsVal.scoreDocs), 0, limit - firstPageHitsCount),
(s, sink) -> {
if (s.lastItem() == null || s.remainingLimit() <= 0) {
sink.complete();
return new PageState(null, 0,0);
}
try {
var lastTopDocs = indexSearcher.searchAfter(s.lastItem(), query, s.hitsPerPage());
if (lastTopDocs.scoreDocs.length > 0) {
ScoreDoc lastItem = getLastItem(lastTopDocs.scoreDocs);
var hitsList = convertHits(s.remainingLimit(),
lastTopDocs.scoreDocs,
indexSearcher,
minCompetitiveScore,
keyFieldName
);
sink.next(hitsList);
if (hitsList.size() < s.hitsPerPage()) {
return new PageState(lastItem, 0, 0);
} else {
return new PageState(lastItem, s.currentPageIndex() + 1, s.remainingLimit() - hitsList.size());
}
} else {
sink.complete();
return new PageState(null, 0, 0);
}
} catch (IOException e) {
sink.error(e);
return new PageState(null, 0, 0);
}
}
)
.subscribeOn(scheduler)
.flatMap(Flux::fromIterable);
return Flux.concat(firstPageHitsFlux, nextPagesFlux);
}
});
if (limit == 0) {
return new LuceneReactiveSearchInstance(totalHitsCount, Flux.empty());
} else {
return new LuceneReactiveSearchInstance(totalHitsCount, resultsFlux);
}
})
.subscribeOn(scheduler);
}
private List<LLKeyScore> convertHits(int currentAllowedResults,
ScoreDoc[] hits,
IndexSearcher indexSearcher,
@Nullable Float minCompetitiveScore,
String keyFieldName) throws IOException {
ArrayList<LLKeyScore> collectedResults = new ArrayList<>(hits.length);
for (ScoreDoc hit : hits) {
int docId = hit.doc;
float score = hit.score;
if (currentAllowedResults-- > 0) {
@Nullable LLKeyScore collectedDoc = LuceneUtils.collectTopDoc(logger, docId, score,
minCompetitiveScore, indexSearcher, keyFieldName);
if (collectedDoc != null) {
collectedResults.add(collectedDoc);
}
} else {
break;
}
}
return collectedResults;
}
private static ScoreDoc getLastItem(ScoreDoc[] scoreDocs) {
return scoreDocs[scoreDocs.length - 1];
}
private record PageState(ScoreDoc lastItem, int currentPageIndex, int remainingLimit) {
public int hitsPerPage() {
return (int) Math.min(MAX_HITS_PER_PAGE, MIN_HITS_PER_PAGE * (1L << currentPageIndex));
}
}
}