Use blocking generator instead of Flux.create

This commit is contained in:
Andrea Cavalli 2021-09-08 22:16:06 +02:00
parent e12e240487
commit 2b21e6a864

View File

@ -6,6 +6,7 @@ import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.TIE_BREAKER;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
@ -82,7 +83,7 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
TopDocs result;
Mono<Void> release;
synchronized (lock) {
//noinspection BlockingMethodInNonBlockingContext
result = firstPageSharedManager.reduce(collectors);
release = Mono.when(indexSearcherReleasersArray);
}
@ -93,30 +94,16 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
Flux<LLKeyScore> firstPageHits = LuceneUtils
.convertHits(result.scoreDocs, indexSearchers, keyFieldName, collectorScheduler, true);
Flux<LLKeyScore> nextHits = Flux.defer(() -> {
if (paginationInfo.forceSinglePage()
|| paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) {
return Flux.empty();
}
return Flux
.<TopDocs>create(emitter -> {
Flux<LLKeyScore> nextHits;
nextHits = Flux
.<TopDocs, CurrentPageInfo>generate(
() -> new CurrentPageInfo(LuceneUtils.getLastFieldDoc(result.scoreDocs),
paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1),
(s, emitter) -> {
if (Schedulers.isInNonBlockingThread()) {
emitter.error(new UnsupportedOperationException("Called collect in a nonblocking thread"));
return;
throw new UnsupportedOperationException("Called collect in a nonblocking thread");
}
Empty<Void> cancelEvent = Sinks.empty();
AtomicReference<CurrentPageInfo> currentPageInfoAtomicReference = new AtomicReference<>(new CurrentPageInfo(LuceneUtils.getLastFieldDoc(result.scoreDocs),
paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1));
emitter.onRequest(requests -> {
if (Schedulers.isInNonBlockingThread()) {
emitter.error(new UnsupportedOperationException("Called collect"
+ ", onRequest in a nonblocking thread"));
return;
}
synchronized (currentPageInfoAtomicReference) {
var s = currentPageInfoAtomicReference.get();
while (requests > 0 && !emitter.isCancelled()) {
requests--;
if (s.last() != null && s.remainingLimit() > 0) {
Sort luceneSort = queryParams.sort();
if (luceneSort == null) {
@ -126,57 +113,46 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
= new ScoringShardsCollectorManager(luceneSort, s.currentPageLimit(),
(FieldDoc) s.last(), LuceneUtils.totalHitsThreshold(), 0, s.currentPageLimit());
TopDocs pageTopDocs = Flux
.fromIterable(indexSearchersArray)
.index()
.<TopFieldCollector>handle((tuple, sink) -> {
try {
IndexSearcher indexSearcher = tuple.getT2();
var collectors = new ObjectArrayList<TopFieldCollector>(indexSearchersArray.size());
for (IndexSearcher indexSearcher : indexSearchersArray) {
//noinspection BlockingMethodInNonBlockingContext
TopFieldCollector collector = sharedManager.newCollector();
//noinspection BlockingMethodInNonBlockingContext
indexSearcher.search(luceneQuery, collector);
sink.next(collector);
} catch (Exception ex) {
sink.error(ex);
collectors.add(collector);
}
})
.collect(Collectors.toCollection(ObjectArrayList::new))
.<TopDocs>handle((collectors, sink) -> {
try {
sink.next(sharedManager.reduce(collectors));
} catch (Exception ex) {
sink.error(ex);
}
})
.single()
.takeUntilOther(cancelEvent.asMono())
.subscribeOn(Schedulers.immediate())
.block();
if (!emitter.isCancelled()) {
Objects.requireNonNull(pageTopDocs);
//noinspection BlockingMethodInNonBlockingContext
var pageTopDocs = sharedManager.reduce(collectors);
var pageLastDoc = LuceneUtils.getLastFieldDoc(pageTopDocs.scoreDocs);
emitter.next(pageTopDocs);
s = new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(), s.pageIndex() + 1);
} else {
s = new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(),
s.pageIndex() + 1);
} catch (IOException ex) {
emitter.error(ex);
s = EMPTY_STATUS;
requests = 0;
}
} else {
emitter.complete();
s = EMPTY_STATUS;
requests = 0;
}
return s;
})
.transform(flux -> {
if (paginationInfo.forceSinglePage()
|| paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) {
return Flux.empty();
} else {
return flux;
}
currentPageInfoAtomicReference.set(s);
}
});
emitter.onCancel(cancelEvent::tryEmitEmpty);
})
.subscribeOn(collectorScheduler)
.flatMapSequential(topFieldDoc -> LuceneUtils
.convertHits(topFieldDoc.scoreDocs, indexSearchers, keyFieldName, collectorScheduler, true)
);
});
return new LuceneSearchResult(LuceneUtils.convertTotalHitsCount(result.totalHits),
firstPageHits