Avoid nesting schedulers inside generators

This commit is contained in:
Andrea Cavalli 2021-09-08 22:32:08 +02:00
parent 2b21e6a864
commit 6926292904
6 changed files with 74 additions and 79 deletions

View File

@ -12,23 +12,17 @@ import reactor.core.scheduler.Schedulers;
public class CountLuceneLocalSearcher implements LuceneLocalSearcher {
@Override
public Mono<LuceneSearchResult> collect(IndexSearcher indexSearcher,
Mono<Void> releaseIndexSearcher,
LocalQueryParams queryParams,
String keyFieldName,
Scheduler scheduler) {
//noinspection BlockingMethodInNonBlockingContext
return Mono
.fromCallable(() -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called collect in a nonblocking thread");
}
return new LuceneSearchResult(
TotalHitsCount.of(indexSearcher.count(queryParams.query()), true),
Flux.empty(),
releaseIndexSearcher);
}
)
.subscribeOn(scheduler);
public Mono<LuceneSearchResult> collect(IndexSearcher indexSearcher, Mono<Void> releaseIndexSearcher,
LocalQueryParams queryParams, String keyFieldName, Scheduler scheduler) {
return Mono.fromCallable(() -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called collect in a nonblocking thread");
}
//noinspection BlockingMethodInNonBlockingContext
return new LuceneSearchResult(TotalHitsCount.of(indexSearcher.count(queryParams.query()), true),
Flux.empty(),
releaseIndexSearcher
);
}).subscribeOn(scheduler);
}
}

View File

@ -47,7 +47,7 @@ public class CountLuceneMultiSearcher implements LuceneMultiSearcher {
Flux.empty(),
Mono.when(release)
);
});
}).subscribeOn(scheduler);
}
};
});

View File

@ -141,6 +141,7 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
}
return s;
})
.subscribeOn(collectorScheduler)
.transform(flux -> {
if (paginationInfo.forceSinglePage()
|| paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) {
@ -149,7 +150,6 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
return flux;
}
})
.subscribeOn(collectorScheduler)
.flatMapSequential(topFieldDoc -> LuceneUtils
.convertHits(topFieldDoc.scoreDocs, indexSearchers, keyFieldName, collectorScheduler, true)
);

View File

@ -68,43 +68,44 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
nextHits = null;
} else {
nextHits = Flux.defer(() -> Flux
.<TopDocs, CurrentPageInfo>generate(
() -> new CurrentPageInfo(LuceneUtils.getLastScoreDoc(firstPageTopDocs.scoreDocs), paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1),
(s, sink) -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called collect in a nonblocking thread");
}
if (s.last() != null && s.remainingLimit() > 0) {
TopDocs pageTopDocs;
try {
TopDocsCollector<ScoreDoc> collector = TopDocsSearcher.getTopDocsCollector(queryParams.sort(),
s.currentPageLimit(),
s.last(),
LuceneUtils.totalHitsThreshold(),
true,
queryParams.isScored()
);
indexSearcher.search(queryParams.query(), collector);
pageTopDocs = collector.topDocs();
} catch (IOException e) {
sink.error(e);
.<TopDocs, CurrentPageInfo>generate(
() -> new CurrentPageInfo(LuceneUtils.getLastScoreDoc(firstPageTopDocs.scoreDocs), paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1),
(s, sink) -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called collect in a nonblocking thread");
}
if (s.last() != null && s.remainingLimit() > 0) {
TopDocs pageTopDocs;
try {
TopDocsCollector<ScoreDoc> collector = TopDocsSearcher.getTopDocsCollector(queryParams.sort(),
s.currentPageLimit(),
s.last(),
LuceneUtils.totalHitsThreshold(),
true,
queryParams.isScored()
);
//noinspection BlockingMethodInNonBlockingContext
indexSearcher.search(queryParams.query(), collector);
pageTopDocs = collector.topDocs();
} catch (IOException e) {
sink.error(e);
return EMPTY_STATUS;
}
var pageLastDoc = LuceneUtils.getLastScoreDoc(pageTopDocs.scoreDocs);
sink.next(pageTopDocs);
return new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(), s.pageIndex() + 1);
} else {
sink.complete();
return EMPTY_STATUS;
}
var pageLastDoc = LuceneUtils.getLastScoreDoc(pageTopDocs.scoreDocs);
sink.next(pageTopDocs);
return new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(), s.pageIndex() + 1);
} else {
sink.complete();
return EMPTY_STATUS;
}
},
s -> {}
)
.subscribeOn(scheduler)
.flatMapSequential(topFieldDoc -> LuceneUtils
.convertHits(topFieldDoc.scoreDocs, IndexSearchers.unsharded(indexSearcher), keyFieldName, scheduler, true)
)
);
},
s -> {}
)
.subscribeOn(scheduler)
.flatMapSequential(topFieldDoc -> LuceneUtils
.convertHits(topFieldDoc.scoreDocs, IndexSearchers.unsharded(indexSearcher), keyFieldName, scheduler, true)
)
);
}
Flux<LLKeyScore> combinedFlux;

View File

@ -5,6 +5,7 @@ import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.EMPTY_STATUS
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
@ -98,30 +99,28 @@ class UnscoredPagedLuceneShardSearcher implements LuceneShardSearcher {
() -> TopDocsSearcher.getTopDocsCollector(queryParams.sort(), s.currentPageLimit(),
s.last(), LuceneUtils.totalHitsThreshold(), true, queryParams.isScored()),
0, s.currentPageLimit(), queryParams.sort());
//noinspection BlockingMethodInNonBlockingContext
TopDocs pageTopDocs = Flux
.fromIterable(indexSearchersArray)
.flatMapSequential(indexSearcher -> Mono
.fromCallable(() -> {
//noinspection BlockingMethodInNonBlockingContext
var collector = currentPageUnsortedCollectorManager.newCollector();
//noinspection BlockingMethodInNonBlockingContext
indexSearcher.search(luceneQuery, collector);
return collector;
})
.subscribeOn(scheduler)
)
.collect(Collectors.toCollection(ObjectArrayList::new))
.flatMap(collectors -> Mono
.fromCallable(() -> currentPageUnsortedCollectorManager.reduce(collectors))
.subscribeOn(scheduler)
)
.blockOptional().orElseThrow();
var pageLastDoc = LuceneUtils.getLastScoreDoc(pageTopDocs.scoreDocs);
sink.next(pageTopDocs);
return new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(),
s.pageIndex() + 1);
try {
var collectors = new ObjectArrayList<TopDocsCollector<ScoreDoc>>(indexSearchersArray.size());
for (IndexSearcher indexSearcher : indexSearchersArray) {
//noinspection BlockingMethodInNonBlockingContext
var collector = currentPageUnsortedCollectorManager.newCollector();
//noinspection BlockingMethodInNonBlockingContext
indexSearcher.search(luceneQuery, collector);
collectors.add(collector);
}
//noinspection BlockingMethodInNonBlockingContext
TopDocs pageTopDocs = currentPageUnsortedCollectorManager.reduce(collectors);
var pageLastDoc = LuceneUtils.getLastScoreDoc(pageTopDocs.scoreDocs);
sink.next(pageTopDocs);
return new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(),
s.pageIndex() + 1);
} catch (IOException ex) {
sink.error(ex);
return EMPTY_STATUS;
}
} else {
sink.complete();
return EMPTY_STATUS;

View File

@ -175,7 +175,8 @@ public class UnscoredUnsortedContinuousLuceneMultiSearcher implements LuceneMult
));
return new LuceneSearchResult(TotalHitsCount.of(0, false), resultsFlux, release);
});
})
.subscribeOn(scheduler);
}
};
});