From 2b21e6a8645891c423b614298edc97d0df9c191d Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Wed, 8 Sep 2021 22:16:06 +0200 Subject: [PATCH] Use blocking generator instead of Flux.create --- .../ScoredSimpleLuceneShardSearcher.java | 136 ++++++++---------- 1 file changed, 56 insertions(+), 80 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java index f504a88..ed321da 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java @@ -6,6 +6,7 @@ import static it.cavallium.dbengine.lucene.searcher.CurrentPageInfo.TIE_BREAKER; import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.lucene.LuceneUtils; import it.unimi.dsi.fastutil.objects.ObjectArrayList; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -82,7 +83,7 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher { TopDocs result; Mono release; synchronized (lock) { - + //noinspection BlockingMethodInNonBlockingContext result = firstPageSharedManager.reduce(collectors); release = Mono.when(indexSearcherReleasersArray); } @@ -93,90 +94,65 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher { Flux firstPageHits = LuceneUtils .convertHits(result.scoreDocs, indexSearchers, keyFieldName, collectorScheduler, true); - Flux nextHits = Flux.defer(() -> { - if (paginationInfo.forceSinglePage() - || paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) { - return Flux.empty(); - } - return Flux - .create(emitter -> { - if (Schedulers.isInNonBlockingThread()) { - emitter.error(new UnsupportedOperationException("Called collect in a nonblocking thread")); - return; - } - Empty cancelEvent = Sinks.empty(); - AtomicReference currentPageInfoAtomicReference = new AtomicReference<>(new CurrentPageInfo(LuceneUtils.getLastFieldDoc(result.scoreDocs), - paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1)); - emitter.onRequest(requests -> { + Flux nextHits; + nextHits = Flux + .generate( + () -> new CurrentPageInfo(LuceneUtils.getLastFieldDoc(result.scoreDocs), + paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1), + (s, emitter) -> { if (Schedulers.isInNonBlockingThread()) { - emitter.error(new UnsupportedOperationException("Called collect" - + ", onRequest in a nonblocking thread")); - return; + throw new UnsupportedOperationException("Called collect in a nonblocking thread"); } - synchronized (currentPageInfoAtomicReference) { - var s = currentPageInfoAtomicReference.get(); - while (requests > 0 && !emitter.isCancelled()) { - requests--; - if (s.last() != null && s.remainingLimit() > 0) { - Sort luceneSort = queryParams.sort(); - if (luceneSort == null) { - luceneSort = Sort.RELEVANCE; - } - CollectorManager sharedManager - = new ScoringShardsCollectorManager(luceneSort, s.currentPageLimit(), - (FieldDoc) s.last(), LuceneUtils.totalHitsThreshold(), 0, s.currentPageLimit()); - TopDocs pageTopDocs = Flux - .fromIterable(indexSearchersArray) - .index() - .handle((tuple, sink) -> { - try { - IndexSearcher indexSearcher = tuple.getT2(); - TopFieldCollector collector = sharedManager.newCollector(); - indexSearcher.search(luceneQuery, collector); - sink.next(collector); - } catch (Exception ex) { - sink.error(ex); - } - }) - .collect(Collectors.toCollection(ObjectArrayList::new)) - .handle((collectors, sink) -> { - try { - sink.next(sharedManager.reduce(collectors)); - } catch (Exception ex) { - sink.error(ex); - } - }) - .single() - .takeUntilOther(cancelEvent.asMono()) - .subscribeOn(Schedulers.immediate()) - .block(); - if (!emitter.isCancelled()) { - Objects.requireNonNull(pageTopDocs); - var pageLastDoc = LuceneUtils.getLastFieldDoc(pageTopDocs.scoreDocs); - emitter.next(pageTopDocs); - s = new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(), s.pageIndex() + 1); - } else { - s = EMPTY_STATUS; - requests = 0; - } - } else { - emitter.complete(); - s = EMPTY_STATUS; - requests = 0; - } + if (s.last() != null && s.remainingLimit() > 0) { + Sort luceneSort = queryParams.sort(); + if (luceneSort == null) { + luceneSort = Sort.RELEVANCE; } - currentPageInfoAtomicReference.set(s); - } - }); + CollectorManager sharedManager + = new ScoringShardsCollectorManager(luceneSort, s.currentPageLimit(), + (FieldDoc) s.last(), LuceneUtils.totalHitsThreshold(), 0, s.currentPageLimit()); - emitter.onCancel(cancelEvent::tryEmitEmpty); - }) - .subscribeOn(collectorScheduler) - .flatMapSequential(topFieldDoc -> LuceneUtils - .convertHits(topFieldDoc.scoreDocs, indexSearchers, keyFieldName, collectorScheduler, true) - ); - }); + try { + var collectors = new ObjectArrayList(indexSearchersArray.size()); + for (IndexSearcher indexSearcher : indexSearchersArray) { + //noinspection BlockingMethodInNonBlockingContext + TopFieldCollector collector = sharedManager.newCollector(); + //noinspection BlockingMethodInNonBlockingContext + indexSearcher.search(luceneQuery, collector); + + collectors.add(collector); + } + + //noinspection BlockingMethodInNonBlockingContext + var pageTopDocs = sharedManager.reduce(collectors); + var pageLastDoc = LuceneUtils.getLastFieldDoc(pageTopDocs.scoreDocs); + emitter.next(pageTopDocs); + + s = new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(), + s.pageIndex() + 1); + } catch (IOException ex) { + emitter.error(ex); + s = EMPTY_STATUS; + } + } else { + emitter.complete(); + s = EMPTY_STATUS; + } + return s; + }) + .transform(flux -> { + if (paginationInfo.forceSinglePage() + || paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) { + return Flux.empty(); + } else { + return flux; + } + }) + .subscribeOn(collectorScheduler) + .flatMapSequential(topFieldDoc -> LuceneUtils + .convertHits(topFieldDoc.scoreDocs, indexSearchers, keyFieldName, collectorScheduler, true) + ); return new LuceneSearchResult(LuceneUtils.convertTotalHitsCount(result.totalHits), firstPageHits