Various searchers optimizations

This commit is contained in:
Andrea Cavalli 2021-09-09 23:00:16 +02:00
parent 6dda099cc1
commit f157e67818
6 changed files with 39 additions and 64 deletions

View File

@ -363,27 +363,27 @@ public class LuceneUtils {
);
}
public static Flux<LLKeyScore> convertHits(ScoreDoc[] hits,
public static Flux<LLKeyScore> convertHits(Flux<ScoreDoc> hits,
IndexSearchers indexSearchers,
String keyFieldName,
Scheduler scheduler,
boolean preserveOrder) {
return Flux
.fromArray(hits)
.transform(hitsFlux -> {
if (preserveOrder) {
return hitsFlux
.flatMapSequential(hit -> Mono
.fromCallable(() -> mapHitBlocking(hit, indexSearchers, keyFieldName))
.subscribeOn(scheduler)
);
} else {
return hitsFlux.flatMap(hit -> Mono
.fromCallable(() -> mapHitBlocking(hit, indexSearchers, keyFieldName))
.subscribeOn(scheduler));
}
});
return hits.transform(hitsFlux -> {
if (preserveOrder) {
return hitsFlux.flatMapSequential(hit -> Mono
.fromCallable(() -> mapHitBlocking(hit, indexSearchers, keyFieldName))
.subscribeOn(scheduler),
2
);
} else {
return hitsFlux.flatMap(hit -> Mono
.fromCallable(() -> mapHitBlocking(hit, indexSearchers, keyFieldName))
.subscribeOn(scheduler),
2
);
}
});
}
@Nullable

View File

@ -92,7 +92,7 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
indexSearchers = IndexSearchers.of(indexSearchersArray);
}
Flux<LLKeyScore> firstPageHits = LuceneUtils
.convertHits(result.scoreDocs, indexSearchers, keyFieldName, collectorScheduler, true);
.convertHits(Flux.fromArray(result.scoreDocs), indexSearchers, keyFieldName, collectorScheduler, true);
Flux<LLKeyScore> nextHits;
nextHits = Flux
@ -151,7 +151,9 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
}
})
.flatMapSequential(topFieldDoc -> LuceneUtils
.convertHits(topFieldDoc.scoreDocs, indexSearchers, keyFieldName, collectorScheduler, true)
.convertHits(Flux.fromArray(topFieldDoc.scoreDocs), indexSearchers,
keyFieldName, collectorScheduler, true),
2
);
return new LuceneSearchResult(LuceneUtils.convertTotalHitsCount(result.totalHits),

View File

@ -77,11 +77,7 @@ public class ScoringShardsCollectorManager implements CollectorManager<TopFieldC
}
i++;
}
result = LuceneUtils.mergeTopDocs(sort, startN,
topN,
topDocs,
TIE_BREAKER
);
result = LuceneUtils.mergeTopDocs(sort, startN, topN, topDocs, TIE_BREAKER);
} else {
TopDocs[] topDocs = new TopDocs[collectors.size()];
var i = 0;
@ -92,11 +88,7 @@ public class ScoringShardsCollectorManager implements CollectorManager<TopFieldC
}
i++;
}
result = LuceneUtils.mergeTopDocs(null, startN,
topN,
topDocs,
TIE_BREAKER
);
result = LuceneUtils.mergeTopDocs(null, startN, topN, topDocs, TIE_BREAKER);
}
return result;
}

View File

@ -53,13 +53,8 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
);
}
Flux<LLKeyScore> firstPageMono = LuceneUtils
.convertHits(
firstPageTopDocs.scoreDocs,
IndexSearchers.unsharded(indexSearcher),
keyFieldName,
scheduler,
true
)
.convertHits(Flux.fromArray(firstPageTopDocs.scoreDocs), IndexSearchers.unsharded(indexSearcher),
keyFieldName, scheduler, true)
.take(queryParams.limit(), true);
@ -78,12 +73,8 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
TopDocs pageTopDocs;
try {
TopDocsCollector<ScoreDoc> collector = TopDocsSearcher.getTopDocsCollector(queryParams.sort(),
s.currentPageLimit(),
s.last(),
LuceneUtils.totalHitsThreshold(),
true,
queryParams.isScored()
);
s.currentPageLimit(), s.last(), LuceneUtils.totalHitsThreshold(), true,
queryParams.isScored());
//noinspection BlockingMethodInNonBlockingContext
indexSearcher.search(queryParams.query(), collector);
pageTopDocs = collector.topDocs();
@ -102,9 +93,8 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
s -> {}
)
.subscribeOn(scheduler)
.flatMapSequential(topFieldDoc -> LuceneUtils
.convertHits(topFieldDoc.scoreDocs, IndexSearchers.unsharded(indexSearcher), keyFieldName, scheduler, true)
)
.flatMapSequential(topFieldDoc -> LuceneUtils.convertHits(Flux.fromArray(topFieldDoc.scoreDocs),
IndexSearchers.unsharded(indexSearcher), keyFieldName, scheduler, true), 2)
);
}

View File

@ -82,17 +82,17 @@ class UnscoredPagedLuceneShardSearcher implements LuceneShardSearcher {
indexSearchers = IndexSearchers.of(indexSearchersArray);
}
Flux<LLKeyScore> firstPageHits = LuceneUtils
.convertHits(result.scoreDocs, indexSearchers, keyFieldName, scheduler, false);
.convertHits(Flux.fromArray(result.scoreDocs), indexSearchers, keyFieldName, scheduler, false);
Flux<LLKeyScore> nextHits = Flux
.<TopDocs, CurrentPageInfo>generate(
() -> new CurrentPageInfo(LuceneUtils.getLastScoreDoc(result.scoreDocs),
paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1),
(s, sink) -> {
int perShardCollectorLimit = s.currentPageLimit() / indexSearchersArray.size();
if (s.last() != null && s.remainingLimit() > 0 && s.currentPageLimit() > 0) {
Objects.requireNonNull(queryParams.scoreMode(), "ScoreMode must not be null");
Query luceneQuery = queryParams.query();
int perShardCollectorLimit = s.currentPageLimit() / indexSearchersArray.size();
UnscoredTopDocsCollectorManager currentPageUnsortedCollectorManager
= new UnscoredTopDocsCollectorManager(
() -> TopDocsSearcher.getTopDocsCollector(queryParams.sort(), perShardCollectorLimit,
@ -127,9 +127,8 @@ class UnscoredPagedLuceneShardSearcher implements LuceneShardSearcher {
}
)
.subscribeOn(scheduler)
.flatMapSequential(topFieldDoc -> LuceneUtils
.convertHits(topFieldDoc.scoreDocs, indexSearchers, keyFieldName, scheduler, false)
)
.flatMapSequential(topFieldDoc -> LuceneUtils.convertHits(Flux.fromArray(topFieldDoc.scoreDocs),
indexSearchers, keyFieldName, scheduler, false), 2)
.transform(flux -> {
if (paginationInfo.forceSinglePage()
|| paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) {

View File

@ -158,21 +158,13 @@ public class UnscoredUnsortedContinuousLuceneMultiSearcher implements LuceneMult
AtomicBoolean resultsAlreadySubscribed = new AtomicBoolean(false);
var resultsFlux = Mono
.<Void>fromCallable(() -> {
if (!resultsAlreadySubscribed.compareAndSet(false, true)) {
throw new UnsupportedOperationException("Already subscribed!");
}
return null;
})
.thenMany(scoreDocsSink.asFlux())
.buffer(1024, ObjectArrayList::new)
.flatMap(scoreDocs -> LuceneUtils.convertHits(scoreDocs.toArray(ScoreDoc[]::new),
indexSearchers,
keyFieldName,
scheduler,
false
));
var scoreDocsFlux = Mono.<Void>fromCallable(() -> {
if (!resultsAlreadySubscribed.compareAndSet(false, true)) {
throw new UnsupportedOperationException("Already subscribed!");
}
return null;
}).thenMany(scoreDocsSink.asFlux());
var resultsFlux = LuceneUtils.convertHits(scoreDocsFlux, indexSearchers, keyFieldName, scheduler, false);
return new LuceneSearchResult(TotalHitsCount.of(0, false), resultsFlux, release);
})