From cc368aecc8c10ccc34356626f1090a0c60d07bd2 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Mon, 29 Nov 2021 14:15:31 +0100 Subject: [PATCH] Fix deadlock --- .../UnsortedUnscoredStreamingMultiSearcher.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java index d17c249..0b79bdf 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java @@ -8,17 +8,24 @@ import it.cavallium.dbengine.database.disk.LLIndexSearchers; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.collector.ReactiveCollectorMultiManager; import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.ScoreDoc; +import org.warp.commonutils.type.ShortNamedThreadFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher { + private static final Scheduler SCHEDULER = Schedulers.fromExecutorService(Executors.newCachedThreadPool( + new ShortNamedThreadFactory("UnscoredStreamingSearcher")), "UnscoredStreamingSearcher"); + @Override public Mono collectMulti(Mono> indexSearchersMono, LocalQueryParams queryParams, @@ -51,7 +58,7 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher { var currentThread = Thread.currentThread(); var cmm = new ReactiveCollectorMultiManager(scoreDocsSink, currentThread); - // Unpark the paused request thread + //// Unpark the paused request thread scoreDocsSink.onRequest(n -> LockSupport.unpark(currentThread)); int mutableShardIndex = 0; @@ -63,9 +70,11 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher { var executor = shard.getExecutor(); if (executor == null) { + //noinspection BlockingMethodInNonBlockingContext shard.search(localQueryParams.query(), collectorManager); } else { // Avoid using the index searcher executor to avoid blocking on its threads + //noinspection BlockingMethodInNonBlockingContext shard.search(localQueryParams.query(), collectorManager.newCollector()); } } catch (Throwable e) { @@ -74,7 +83,9 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher { scoreDocsSink.complete(); } } - }, OverflowStrategy.ERROR).subscribeOn(Schedulers.boundedElastic()); + }, OverflowStrategy.ERROR) + .limitRate(2048, 256) + .subscribeOn(SCHEDULER, true); Flux resultsFlux = LuceneUtils.convertHits(scoreDocsFlux, shards, keyFieldName, false);