Fix scheduling

This commit is contained in:
Andrea Cavalli 2021-07-06 02:23:06 +02:00
parent 7cebcd7e92
commit e7e25e646d
4 changed files with 9 additions and 7 deletions

View File

@ -88,7 +88,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
"lucene",
Integer.MAX_VALUE,
false
true
);
// Scheduler used to get callback values of LuceneStreamSearcher without creating deadlocks
private final Scheduler luceneSearcherScheduler = Schedulers.newBoundedElastic(
@ -96,7 +96,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
"lucene-searcher",
60,
false
true
);
private final String luceneIndexName;

View File

@ -8,7 +8,7 @@ import org.jetbrains.annotations.Nullable;
record CurrentPageInfo(@Nullable ScoreDoc last, long remainingLimit, int pageIndex) {
private static final int MAX_ITEMS_PER_PAGE = 100;
private static final int MAX_ITEMS_PER_PAGE = 500;
public static final Comparator<ScoreDoc> TIE_BREAKER = Comparator.comparingInt((d) -> d.shardIndex);
public static final CurrentPageInfo EMPTY_STATUS = new CurrentPageInfo(null, 0, 0);

View File

@ -25,6 +25,7 @@ import org.apache.lucene.search.TopFieldDocs;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
class FieldSimpleLuceneShardSearcher implements LuceneShardSearcher {
@ -141,7 +142,7 @@ class FieldSimpleLuceneShardSearcher implements LuceneShardSearcher {
)
.collect(Collectors.toCollection(ObjectArrayList::new))
.map(topFieldDocs -> topFieldDocs.toArray(TopFieldDocs[]::new))
.map(topFieldDocs -> {
.flatMap(topFieldDocs -> Mono.fromCallable(() -> {
if (queryParams.sort() != null) {
return TopDocs.merge(queryParams.sort(), 0, s.currentPageLimit(),
topFieldDocs,
@ -153,7 +154,8 @@ class FieldSimpleLuceneShardSearcher implements LuceneShardSearcher {
TIE_BREAKER
);
}
})
}).subscribeOn(scheduler))
.subscribeOn(Schedulers.immediate())
.blockOptional().orElseThrow();
var pageLastDoc = LuceneUtils.getLastFieldDoc(pageTopDocs.scoreDocs);
sink.next(pageTopDocs);

View File

@ -118,10 +118,10 @@ class UnscoredLuceneShardSearcher implements LuceneShardSearcher {
)
.collect(Collectors.toCollection(ObjectArrayList::new))
.map(topFieldDocs -> topFieldDocs.toArray(TopDocs[]::new))
.map(topFieldDocs -> TopDocs.merge(0, s.currentPageLimit(),
.flatMap(topFieldDocs -> Mono.fromCallable(() -> TopDocs.merge(0, s.currentPageLimit(),
topFieldDocs,
TIE_BREAKER
))
)).subscribeOn(scheduler))
.blockOptional().orElseThrow();
var pageLastDoc = LuceneUtils.getLastScoreDoc(pageTopDocs.scoreDocs);
sink.next(pageTopDocs);