CavalliumDBEngine/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java

120 lines
4.9 KiB
Java
Raw Normal View History

package it.cavallium.dbengine.lucene.searcher;
2021-07-06 01:30:37 +02:00
import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.EMPTY_STATUS;
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.FIRST_PAGE_LIMIT;
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.lucene.LuceneUtils;
2021-07-06 01:30:37 +02:00
import java.io.IOException;
2021-09-09 23:27:39 +02:00
import java.util.Arrays;
import java.util.Objects;
import org.apache.lucene.search.IndexSearcher;
2021-07-17 23:06:26 +02:00
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
2021-07-17 23:06:26 +02:00
import org.apache.lucene.search.TopDocsCollector;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
@Override
public Mono<LuceneSearchResult> collect(IndexSearcher indexSearcher,
2021-07-10 20:52:01 +02:00
Mono<Void> releaseIndexSearcher,
2021-07-06 01:30:37 +02:00
LocalQueryParams queryParams,
String keyFieldName,
Scheduler scheduler) {
return Mono
.fromCallable(() -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called collect in a nonblocking thread");
}
Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null");
2021-07-06 01:30:37 +02:00
PaginationInfo paginationInfo;
if (queryParams.limit() <= MAX_SINGLE_SEARCH_LIMIT) {
paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), queryParams.limit(), true);
} else {
2021-07-06 01:52:12 +02:00
paginationInfo = new PaginationInfo(queryParams.limit(), queryParams.offset(), FIRST_PAGE_LIMIT, false);
2021-07-06 01:30:37 +02:00
}
2021-07-17 23:06:26 +02:00
TopDocs firstPageTopDocs;
{
TopDocsCollector<ScoreDoc> firstPageCollector = TopDocsSearcher.getTopDocsCollector(
queryParams.sort(),
LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset() + paginationInfo.firstPageLimit()),
null,
LuceneUtils.totalHitsThreshold(),
2021-07-27 19:34:51 +02:00
!paginationInfo.forceSinglePage(),
queryParams.isScored());
//noinspection BlockingMethodInNonBlockingContext
2021-07-17 23:06:26 +02:00
indexSearcher.search(queryParams.query(), firstPageCollector);
firstPageTopDocs = firstPageCollector.topDocs(LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset()),
LuceneUtils.safeLongToInt(paginationInfo.firstPageLimit())
);
}
Flux<LLKeyScore> firstPageMono = LuceneUtils
2021-09-09 23:00:16 +02:00
.convertHits(Flux.fromArray(firstPageTopDocs.scoreDocs), IndexSearchers.unsharded(indexSearcher),
keyFieldName, scheduler, true)
.take(queryParams.limit(), true);
2021-07-06 01:30:37 +02:00
2021-07-18 19:37:24 +02:00
Flux<LLKeyScore> nextHits;
if (paginationInfo.forceSinglePage() || paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) {
nextHits = null;
} else {
nextHits = Flux.defer(() -> Flux
.<TopDocs, CurrentPageInfo>generate(
() -> new CurrentPageInfo(LuceneUtils.getLastScoreDoc(firstPageTopDocs.scoreDocs), paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1),
(s, sink) -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called collect in a nonblocking thread");
}
if (s.last() != null && s.remainingLimit() > 0) {
TopDocs pageTopDocs;
try {
TopDocsCollector<ScoreDoc> collector = TopDocsSearcher.getTopDocsCollector(queryParams.sort(),
2021-09-09 23:00:16 +02:00
s.currentPageLimit(), s.last(), LuceneUtils.totalHitsThreshold(), true,
queryParams.isScored());
//noinspection BlockingMethodInNonBlockingContext
indexSearcher.search(queryParams.query(), collector);
pageTopDocs = collector.topDocs();
} catch (IOException e) {
sink.error(e);
return EMPTY_STATUS;
}
var pageLastDoc = LuceneUtils.getLastScoreDoc(pageTopDocs.scoreDocs);
sink.next(pageTopDocs);
return new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(), s.pageIndex() + 1);
} else {
sink.complete();
return EMPTY_STATUS;
}
},
s -> {}
)
.subscribeOn(scheduler)
2021-09-09 23:27:39 +02:00
.flatMapIterable(topDocs -> Arrays.asList(topDocs.scoreDocs))
.transform(topFieldDocFlux -> LuceneUtils.convertHits(topFieldDocFlux,
IndexSearchers.unsharded(indexSearcher), keyFieldName, scheduler, true))
);
2021-07-18 19:37:24 +02:00
}
Flux<LLKeyScore> combinedFlux;
if (nextHits != null) {
combinedFlux = firstPageMono
.concatWith(nextHits);
} else {
combinedFlux = firstPageMono;
}
2021-07-06 01:30:37 +02:00
2021-08-04 01:12:39 +02:00
return new LuceneSearchResult(LuceneUtils.convertTotalHitsCount(firstPageTopDocs.totalHits), combinedFlux,
2021-07-18 19:37:24 +02:00
//.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)),
2021-07-10 20:52:01 +02:00
releaseIndexSearcher
2021-07-08 18:54:53 +02:00
);
})
.subscribeOn(scheduler);
}
}