From 6fe9f9c24f736551237efd267b495984070562c3 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Wed, 25 Aug 2021 10:23:42 +0200 Subject: [PATCH] Fix hanging on ScoredSimpleLuceneShardSearcher --- .../lucene/searcher/LuceneShardSearcher.java | 4 +- .../ScoredSimpleLuceneShardSearcher.java | 118 ++++++++++-------- 2 files changed, 71 insertions(+), 51 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneShardSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneShardSearcher.java index 9bd78bb..e9ab9ca 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneShardSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneShardSearcher.java @@ -20,7 +20,7 @@ public interface LuceneShardSearcher { /** * @param queryParams the query parameters * @param keyFieldName the name of the key field - * @param scheduler a blocking scheduler + * @param collectorScheduler a blocking scheduler */ - Mono collect(LocalQueryParams queryParams, String keyFieldName, Scheduler scheduler); + Mono collect(LocalQueryParams queryParams, String keyFieldName, Scheduler collectorScheduler); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java index b3f721e..24bd9a0 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredSimpleLuceneShardSearcher.java @@ -8,6 +8,9 @@ import it.cavallium.dbengine.lucene.LuceneUtils; import it.unimi.dsi.fastutil.objects.ObjectArrayList; import java.util.ArrayList; import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.apache.lucene.search.CollectorManager; import org.apache.lucene.search.FieldDoc; @@ -20,6 +23,8 @@ import org.apache.lucene.search.TopFieldCollector; import org.apache.lucene.search.TopFieldDocs; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.Empty; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; @@ -61,7 +66,7 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher { } @Override - public Mono collect(LocalQueryParams queryParams, String keyFieldName, Scheduler scheduler) { + public Mono collect(LocalQueryParams queryParams, String keyFieldName, Scheduler collectorScheduler) { if (!queryParams.isScored()) { return Mono.error( new UnsupportedOperationException("Can't execute an unscored query with a scored lucene shard searcher") @@ -72,7 +77,7 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher { TopDocs result; Mono release; synchronized (lock) { - //noinspection BlockingMethodInNonBlockingContext + result = firstPageSharedManager.reduce(collectors); release = Mono.when(indexSearcherReleasersArray); } @@ -81,7 +86,7 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher { indexSearchers = IndexSearchers.of(indexSearchersArray); } Flux firstPageHits = LuceneUtils - .convertHits(result.scoreDocs, indexSearchers, keyFieldName, scheduler, true); + .convertHits(result.scoreDocs, indexSearchers, keyFieldName, collectorScheduler, true); Flux nextHits = Flux.defer(() -> { if (paginationInfo.forceSinglePage() @@ -89,54 +94,69 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher { return Flux.empty(); } return Flux - .generate( - () -> new CurrentPageInfo(LuceneUtils.getLastFieldDoc(result.scoreDocs), - paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1), - (s, sink) -> { - if (s.last() != null && s.remainingLimit() > 0) { - Sort luceneSort = queryParams.sort(); - if (luceneSort == null) { - luceneSort = Sort.RELEVANCE; + .create(emitter -> { + Empty cancelEvent = Sinks.empty(); + AtomicReference currentPageInfoAtomicReference = new AtomicReference<>(new CurrentPageInfo(LuceneUtils.getLastFieldDoc(result.scoreDocs), + paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1)); + emitter.onRequest(requests -> { + synchronized (currentPageInfoAtomicReference) { + var s = currentPageInfoAtomicReference.get(); + while (requests > 0 && !emitter.isCancelled()) { + requests--; + if (s.last() != null && s.remainingLimit() > 0) { + Sort luceneSort = queryParams.sort(); + if (luceneSort == null) { + luceneSort = Sort.RELEVANCE; + } + CollectorManager sharedManager + = new ScoringShardsCollectorManager(luceneSort, s.currentPageLimit(), + (FieldDoc) s.last(), LuceneUtils.totalHitsThreshold(), 0, s.currentPageLimit()); + + TopDocs pageTopDocs = Flux + .fromIterable(indexSearchersArray) + .index() + .flatMapSequential(tuple -> Mono + .fromCallable(() -> { + long shardIndex = tuple.getT1(); + IndexSearcher indexSearcher = tuple.getT2(); + TopFieldCollector collector = sharedManager.newCollector(); + indexSearcher.search(luceneQuery, collector); + return collector; + }) + .subscribeOn(Schedulers.immediate()) + ) + .collect(Collectors.toCollection(ObjectArrayList::new)) + .flatMap(collectors -> Mono + .fromCallable(() -> sharedManager.reduce(collectors)) + .subscribeOn(Schedulers.immediate()) + ) + .single() + .takeUntilOther(cancelEvent.asMono()) + .block(); + if (!emitter.isCancelled()) { + Objects.requireNonNull(pageTopDocs); + var pageLastDoc = LuceneUtils.getLastFieldDoc(pageTopDocs.scoreDocs); + emitter.next(pageTopDocs); + s = new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(), s.pageIndex() + 1); + } else { + s = EMPTY_STATUS; + requests = 0; + } + } else { + emitter.complete(); + s = EMPTY_STATUS; + requests = 0; } - CollectorManager sharedManager - = new ScoringShardsCollectorManager(luceneSort, s.currentPageLimit(), - (FieldDoc) s.last(), LuceneUtils.totalHitsThreshold(), 0, s.currentPageLimit()); - //noinspection BlockingMethodInNonBlockingContext - TopDocs pageTopDocs = Flux - .fromIterable(indexSearchersArray) - .index() - .flatMapSequential(tuple -> Mono - .fromCallable(() -> { - long shardIndex = tuple.getT1(); - IndexSearcher indexSearcher = tuple.getT2(); - //noinspection BlockingMethodInNonBlockingContext - TopFieldCollector collector = sharedManager.newCollector(); - //noinspection BlockingMethodInNonBlockingContext - indexSearcher.search(luceneQuery, collector); - return collector; - }) - .subscribeOn(scheduler) - ) - .collect(Collectors.toCollection(ObjectArrayList::new)) - .flatMap(collectors -> Mono.fromCallable(() -> { - //noinspection BlockingMethodInNonBlockingContext - return sharedManager.reduce(collectors); - }).subscribeOn(scheduler)) - .subscribeOn(Schedulers.immediate()) - .blockOptional().orElseThrow(); - var pageLastDoc = LuceneUtils.getLastFieldDoc(pageTopDocs.scoreDocs); - sink.next(pageTopDocs); - return new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(), s.pageIndex() + 1); - } else { - sink.complete(); - return EMPTY_STATUS; } - }, - s -> {} - ) - .subscribeOn(scheduler) + currentPageInfoAtomicReference.set(s); + } + }); + + emitter.onCancel(cancelEvent::tryEmitEmpty); + }) + .subscribeOn(collectorScheduler) .flatMapSequential(topFieldDoc -> LuceneUtils - .convertHits(topFieldDoc.scoreDocs, indexSearchers, keyFieldName, scheduler, true) + .convertHits(topFieldDoc.scoreDocs, indexSearchers, keyFieldName, collectorScheduler, true) ); }); @@ -147,7 +167,7 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher { release ); }) - .subscribeOn(scheduler); + .subscribeOn(Schedulers.boundedElastic()); } }