CavalliumDBEngine/src/main/java/it/cavallium/dbengine/lucene/searcher/SimpleLuceneLocalSearcher.java

192 lines
7.3 KiB
Java
Raw Normal View History

package it.cavallium.dbengine.lucene.searcher;
2021-07-06 01:30:37 +02:00
import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.EMPTY_STATUS;
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.FIRST_PAGE_LIMIT;
import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT;
2021-09-18 18:34:21 +02:00
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLKeyScore;
2021-09-18 18:34:21 +02:00
import it.cavallium.dbengine.database.LLUtils;
2021-09-20 12:51:27 +02:00
import it.cavallium.dbengine.database.disk.LLIndexSearcher;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.database.disk.LLIndexSearchers.UnshardedIndexSearchers;
import it.cavallium.dbengine.lucene.LuceneUtils;
2021-10-08 02:13:33 +02:00
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
2021-07-06 01:30:37 +02:00
import java.io.IOException;
2021-09-09 23:27:39 +02:00
import java.util.Arrays;
2021-09-22 11:03:39 +02:00
import java.util.List;
import java.util.Objects;
2021-09-22 11:03:39 +02:00
import org.apache.lucene.search.IndexSearcher;
2021-07-17 23:06:26 +02:00
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
2021-07-17 23:06:26 +02:00
import org.apache.lucene.search.TopDocsCollector;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;
import reactor.core.scheduler.Schedulers;
public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
@Override
2021-09-20 12:51:27 +02:00
public Mono<Send<LuceneSearchResult>> collect(Mono<Send<LLIndexSearcher>> indexSearcherMono,
2021-07-06 01:30:37 +02:00
LocalQueryParams queryParams,
2021-09-20 12:51:27 +02:00
String keyFieldName,
LLSearchTransformer transformer) {
2021-07-06 01:30:37 +02:00
2021-09-18 18:34:21 +02:00
Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null");
PaginationInfo paginationInfo = getPaginationInfo(queryParams);
2021-09-20 12:51:27 +02:00
var indexSearchersMono = indexSearcherMono.map(LLIndexSearchers::unsharded);
2021-10-08 02:13:33 +02:00
return LLUtils.usingResource(indexSearchersMono, indexSearchers -> {
var queryParamsMono = transformer
.transform(Mono.fromSupplier(() -> new TransformerInput(indexSearchers, queryParams)));
return queryParamsMono.flatMap(queryParams2 -> this
// Search first page results
.searchFirstPage(indexSearchers.shards(), queryParams2, paginationInfo)
// Compute the results of the first page
.transform(firstPageTopDocsMono -> this.computeFirstPageResults(firstPageTopDocsMono, indexSearchers.shards(),
keyFieldName, queryParams2))
// Compute other results
.transform(firstResult -> this.computeOtherResults(firstResult, indexSearchers.shards(), queryParams2,
keyFieldName, indexSearchers::close))
// Ensure that one LuceneSearchResult is always returned
.single()
);
},
false);
}
/**
* Get the pagination info
*/
private PaginationInfo getPaginationInfo(LocalQueryParams queryParams) {
2021-09-18 18:34:21 +02:00
if (queryParams.limit() <= MAX_SINGLE_SEARCH_LIMIT) {
2021-09-20 18:20:59 +02:00
return new PaginationInfo(queryParams.limit(), queryParams.offset(), queryParams.pageLimits(), true);
2021-09-18 18:34:21 +02:00
} else {
2021-09-20 18:20:59 +02:00
return new PaginationInfo(queryParams.limit(), queryParams.offset(), queryParams.pageLimits(), false);
2021-09-18 18:34:21 +02:00
}
}
2021-07-06 01:30:37 +02:00
/**
* Search effectively the raw results of the first page
*/
2021-09-22 11:03:39 +02:00
private Mono<PageData> searchFirstPage(List<IndexSearcher> indexSearchers,
LocalQueryParams queryParams,
PaginationInfo paginationInfo) {
2021-09-20 18:20:59 +02:00
var limit = paginationInfo.totalLimit();
var pagination = !paginationInfo.forceSinglePage();
var resultsOffset = LuceneUtils.safeLongToInt(paginationInfo.firstPageOffset());
return Mono
.fromSupplier(() -> new CurrentPageInfo(null, limit, 0))
.handle((s, sink) -> this.searchPageSync(queryParams, indexSearchers, pagination, resultsOffset, s, sink));
}
/**
* Compute the results of the first page, extracting useful data
*/
private Mono<FirstPageResults> computeFirstPageResults(Mono<PageData> firstPageDataMono,
2021-09-22 11:03:39 +02:00
List<IndexSearcher> indexSearchers,
String keyFieldName,
LocalQueryParams queryParams) {
return firstPageDataMono.map(firstPageData -> {
var totalHitsCount = LuceneUtils.convertTotalHitsCount(firstPageData.topDocs().totalHits);
2021-09-22 11:03:39 +02:00
var scoreDocs = firstPageData.topDocs().scoreDocs;
assert LLUtils.isSet(scoreDocs);
2021-09-22 11:03:39 +02:00
Flux<LLKeyScore> firstPageHitsFlux = LuceneUtils.convertHits(Flux.fromArray(scoreDocs),
indexSearchers, keyFieldName, true)
.take(queryParams.limit(), true);
CurrentPageInfo nextPageInfo = firstPageData.nextPageInfo();
return new FirstPageResults(totalHitsCount, firstPageHitsFlux, nextPageInfo);
});
}
private Mono<Send<LuceneSearchResult>> computeOtherResults(Mono<FirstPageResults> firstResultMono,
2021-09-22 11:03:39 +02:00
List<IndexSearcher> indexSearchers,
LocalQueryParams queryParams,
2021-09-22 11:03:39 +02:00
String keyFieldName,
2021-10-01 19:17:33 +02:00
Runnable onClose) {
return firstResultMono.map(firstResult -> {
var totalHitsCount = firstResult.totalHitsCount();
var firstPageHitsFlux = firstResult.firstPageHitsFlux();
var secondPageInfo = firstResult.nextPageInfo();
Flux<LLKeyScore> nextHitsFlux = searchOtherPages(indexSearchers, queryParams, keyFieldName, secondPageInfo);
Flux<LLKeyScore> combinedFlux = firstPageHitsFlux.concatWith(nextHitsFlux);
2021-10-01 19:17:33 +02:00
return new LuceneSearchResult(totalHitsCount, combinedFlux, onClose).send();
});
}
2021-09-19 19:59:37 +02:00
/**
* Search effectively the merged raw results of the next pages
*/
2021-09-22 11:03:39 +02:00
private Flux<LLKeyScore> searchOtherPages(List<IndexSearcher> indexSearchers,
2021-09-19 19:59:37 +02:00
LocalQueryParams queryParams, String keyFieldName, CurrentPageInfo secondPageInfo) {
return Flux
.<PageData, CurrentPageInfo>generate(
() -> secondPageInfo,
(s, sink) -> searchPageSync(queryParams, indexSearchers, true, 0, s, sink),
s -> {}
)
.subscribeOn(Schedulers.boundedElastic())
.map(PageData::topDocs)
.flatMapIterable(topDocs -> Arrays.asList(topDocs.scoreDocs))
.transform(topFieldDocFlux -> LuceneUtils.convertHits(topFieldDocFlux, indexSearchers,
keyFieldName, true));
}
/**
*
* @param resultsOffset offset of the resulting topDocs. Useful if you want to
* skip the first n results in the first page
*/
private CurrentPageInfo searchPageSync(LocalQueryParams queryParams,
2021-09-22 11:03:39 +02:00
List<IndexSearcher> indexSearchers,
boolean allowPagination,
int resultsOffset,
CurrentPageInfo s,
SynchronousSink<PageData> sink) {
LLUtils.ensureBlocking();
if (resultsOffset < 0) {
throw new IndexOutOfBoundsException(resultsOffset);
}
2021-09-20 18:20:59 +02:00
var currentPageLimit = queryParams.pageLimits().getPageLimit(s.pageIndex());
if ((s.pageIndex() == 0 || s.last() != null) && s.remainingLimit() > 0) {
TopDocs pageTopDocs;
try {
TopDocsCollector<ScoreDoc> collector = TopDocsSearcher.getTopDocsCollector(queryParams.sort(),
2021-09-20 18:20:59 +02:00
currentPageLimit, s.last(), LuceneUtils.totalHitsThreshold(),
allowPagination, queryParams.isScored());
2021-09-22 11:03:39 +02:00
indexSearchers.get(0).search(queryParams.query(), collector);
if (resultsOffset > 0) {
2021-09-20 18:20:59 +02:00
pageTopDocs = collector.topDocs(resultsOffset, currentPageLimit);
} else {
pageTopDocs = collector.topDocs();
}
} catch (IOException e) {
sink.error(e);
return EMPTY_STATUS;
}
var pageLastDoc = LuceneUtils.getLastScoreDoc(pageTopDocs.scoreDocs);
long nextRemainingLimit;
if (allowPagination) {
2021-09-20 18:20:59 +02:00
nextRemainingLimit = s.remainingLimit() - currentPageLimit;
} else {
nextRemainingLimit = 0L;
}
var nextPageIndex = s.pageIndex() + 1;
var nextPageInfo = new CurrentPageInfo(pageLastDoc, nextRemainingLimit, nextPageIndex);
sink.next(new PageData(pageTopDocs, nextPageInfo));
return nextPageInfo;
} else {
sink.complete();
return EMPTY_STATUS;
}
}
}