Fix deadlock
This commit is contained in:
parent
d8de969bee
commit
cc368aecc8
|
@ -8,17 +8,24 @@ import it.cavallium.dbengine.database.disk.LLIndexSearchers;
|
||||||
import it.cavallium.dbengine.lucene.LuceneUtils;
|
import it.cavallium.dbengine.lucene.LuceneUtils;
|
||||||
import it.cavallium.dbengine.lucene.collector.ReactiveCollectorMultiManager;
|
import it.cavallium.dbengine.lucene.collector.ReactiveCollectorMultiManager;
|
||||||
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
|
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.locks.LockSupport;
|
import java.util.concurrent.locks.LockSupport;
|
||||||
import org.apache.lucene.search.IndexSearcher;
|
import org.apache.lucene.search.IndexSearcher;
|
||||||
import org.apache.lucene.search.ScoreDoc;
|
import org.apache.lucene.search.ScoreDoc;
|
||||||
|
import org.warp.commonutils.type.ShortNamedThreadFactory;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.FluxSink.OverflowStrategy;
|
import reactor.core.publisher.FluxSink.OverflowStrategy;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
import reactor.core.scheduler.Scheduler;
|
||||||
import reactor.core.scheduler.Schedulers;
|
import reactor.core.scheduler.Schedulers;
|
||||||
|
|
||||||
public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher {
|
public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher {
|
||||||
|
|
||||||
|
private static final Scheduler SCHEDULER = Schedulers.fromExecutorService(Executors.newCachedThreadPool(
|
||||||
|
new ShortNamedThreadFactory("UnscoredStreamingSearcher")), "UnscoredStreamingSearcher");
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Mono<LuceneSearchResult> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
|
public Mono<LuceneSearchResult> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
|
||||||
LocalQueryParams queryParams,
|
LocalQueryParams queryParams,
|
||||||
|
@ -51,7 +58,7 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher {
|
||||||
var currentThread = Thread.currentThread();
|
var currentThread = Thread.currentThread();
|
||||||
var cmm = new ReactiveCollectorMultiManager(scoreDocsSink, currentThread);
|
var cmm = new ReactiveCollectorMultiManager(scoreDocsSink, currentThread);
|
||||||
|
|
||||||
// Unpark the paused request thread
|
//// Unpark the paused request thread
|
||||||
scoreDocsSink.onRequest(n -> LockSupport.unpark(currentThread));
|
scoreDocsSink.onRequest(n -> LockSupport.unpark(currentThread));
|
||||||
|
|
||||||
int mutableShardIndex = 0;
|
int mutableShardIndex = 0;
|
||||||
|
@ -63,9 +70,11 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher {
|
||||||
|
|
||||||
var executor = shard.getExecutor();
|
var executor = shard.getExecutor();
|
||||||
if (executor == null) {
|
if (executor == null) {
|
||||||
|
//noinspection BlockingMethodInNonBlockingContext
|
||||||
shard.search(localQueryParams.query(), collectorManager);
|
shard.search(localQueryParams.query(), collectorManager);
|
||||||
} else {
|
} else {
|
||||||
// Avoid using the index searcher executor to avoid blocking on its threads
|
// Avoid using the index searcher executor to avoid blocking on its threads
|
||||||
|
//noinspection BlockingMethodInNonBlockingContext
|
||||||
shard.search(localQueryParams.query(), collectorManager.newCollector());
|
shard.search(localQueryParams.query(), collectorManager.newCollector());
|
||||||
}
|
}
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
|
@ -74,7 +83,9 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher {
|
||||||
scoreDocsSink.complete();
|
scoreDocsSink.complete();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}, OverflowStrategy.ERROR).subscribeOn(Schedulers.boundedElastic());
|
}, OverflowStrategy.ERROR)
|
||||||
|
.limitRate(2048, 256)
|
||||||
|
.subscribeOn(SCHEDULER, true);
|
||||||
|
|
||||||
|
|
||||||
Flux<LLKeyScore> resultsFlux = LuceneUtils.convertHits(scoreDocsFlux, shards, keyFieldName, false);
|
Flux<LLKeyScore> resultsFlux = LuceneUtils.convertHits(scoreDocsFlux, shards, keyFieldName, false);
|
||||||
|
|
Loading…
Reference in New Issue
Block a user