This commit is contained in:
Andrea Cavalli 2021-09-09 12:31:49 +02:00
parent cb5a5e36f2
commit 48ba4d15de

View File

@ -31,10 +31,11 @@ class UnscoredPagedLuceneShardSearcher implements LuceneShardSearcher {
private final Query luceneQuery;
private final PaginationInfo paginationInfo;
public UnscoredPagedLuceneShardSearcher(CollectorManager<TopDocsCollector<ScoreDoc>, TopDocs> unsortedCollectorManager,
public UnscoredPagedLuceneShardSearcher(
CollectorManager<TopDocsCollector<ScoreDoc>, TopDocs> firstPagensortedCollectorManager,
Query luceneQuery,
PaginationInfo paginationInfo) {
this.firstPageUnsortedCollectorManager = unsortedCollectorManager;
this.firstPageUnsortedCollectorManager = firstPagensortedCollectorManager;
this.luceneQuery = luceneQuery;
this.paginationInfo = paginationInfo;
}
@ -83,56 +84,60 @@ class UnscoredPagedLuceneShardSearcher implements LuceneShardSearcher {
Flux<LLKeyScore> firstPageHits = LuceneUtils
.convertHits(result.scoreDocs, indexSearchers, keyFieldName, scheduler, false);
Flux<LLKeyScore> nextHits = Flux.defer(() -> {
if (paginationInfo.forceSinglePage() || paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) {
return Flux.empty();
}
return Flux
.<TopDocs, CurrentPageInfo>generate(
() -> new CurrentPageInfo(LuceneUtils.getLastScoreDoc(result.scoreDocs),
paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1),
(s, sink) -> {
if (s.last() != null && s.remainingLimit() > 0 && s.currentPageLimit() > 0) {
Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null");
Query luceneQuery = queryParams.query();
UnscoredTopDocsCollectorManager currentPageUnsortedCollectorManager = new UnscoredTopDocsCollectorManager(
() -> TopDocsSearcher.getTopDocsCollector(queryParams.sort(), s.currentPageLimit(),
s.last(), LuceneUtils.totalHitsThreshold(), true, queryParams.isScored()),
0, s.currentPageLimit(), queryParams.sort());
Flux<LLKeyScore> nextHits = Flux
.<TopDocs, CurrentPageInfo>generate(
() -> new CurrentPageInfo(LuceneUtils.getLastScoreDoc(result.scoreDocs),
paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1),
(s, sink) -> {
int perShardCollectorLimit = s.currentPageLimit() / indexSearchersArray.size();
if (s.last() != null && s.remainingLimit() > 0 && s.currentPageLimit() > 0) {
Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null");
Query luceneQuery = queryParams.query();
UnscoredTopDocsCollectorManager currentPageUnsortedCollectorManager
= new UnscoredTopDocsCollectorManager(
() -> TopDocsSearcher.getTopDocsCollector(queryParams.sort(), perShardCollectorLimit,
s.last(), LuceneUtils.totalHitsThreshold(), true, queryParams.isScored()),
0, s.currentPageLimit(), queryParams.sort());
try {
var collectors = new ObjectArrayList<TopDocsCollector<ScoreDoc>>(indexSearchersArray.size());
for (IndexSearcher indexSearcher : indexSearchersArray) {
//noinspection BlockingMethodInNonBlockingContext
var collector = currentPageUnsortedCollectorManager.newCollector();
//noinspection BlockingMethodInNonBlockingContext
indexSearcher.search(luceneQuery, collector);
collectors.add(collector);
}
try {
var collectors = new ObjectArrayList<TopDocsCollector<ScoreDoc>>(indexSearchersArray.size());
for (IndexSearcher indexSearcher : indexSearchersArray) {
//noinspection BlockingMethodInNonBlockingContext
TopDocs pageTopDocs = currentPageUnsortedCollectorManager.reduce(collectors);
var pageLastDoc = LuceneUtils.getLastScoreDoc(pageTopDocs.scoreDocs);
var collector = currentPageUnsortedCollectorManager.newCollector();
//noinspection BlockingMethodInNonBlockingContext
indexSearcher.search(luceneQuery, collector);
sink.next(pageTopDocs);
return new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(),
s.pageIndex() + 1);
} catch (IOException ex) {
sink.error(ex);
return EMPTY_STATUS;
collectors.add(collector);
}
} else {
sink.complete();
//noinspection BlockingMethodInNonBlockingContext
TopDocs pageTopDocs = currentPageUnsortedCollectorManager.reduce(collectors);
var pageLastDoc = LuceneUtils.getLastScoreDoc(pageTopDocs.scoreDocs);
sink.next(pageTopDocs);
return new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(),
s.pageIndex() + 1);
} catch (IOException ex) {
sink.error(ex);
return EMPTY_STATUS;
}
},
s -> {}
)
.subscribeOn(scheduler)
.flatMapSequential(topFieldDoc -> LuceneUtils
.convertHits(topFieldDoc.scoreDocs, indexSearchers, keyFieldName, scheduler, false)
);
});
} else {
sink.complete();
return EMPTY_STATUS;
}
}
)
.subscribeOn(scheduler)
.flatMapSequential(topFieldDoc -> LuceneUtils
.convertHits(topFieldDoc.scoreDocs, indexSearchers, keyFieldName, scheduler, false)
)
.transform(flux -> {
if (paginationInfo.forceSinglePage()
|| paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) {
return Flux.empty();
} else {
return flux;
}
});
return new LuceneSearchResult(LuceneUtils.convertTotalHitsCount(result.totalHits), firstPageHits
.concatWith(nextHits),