This commit is contained in:
Andrea Cavalli 2021-12-16 02:47:52 +01:00
parent 01099cc4d1
commit 2e1678373c

View File

@ -33,7 +33,7 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher {
private static final int SEARCH_THREADS = Math.min(Math.max(8, Runtime.getRuntime().availableProcessors()), 128);
private static final ThreadFactory THREAD_FACTORY = new ShortNamedThreadFactory("UnscoredStreamingSearcher");
private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(SEARCH_THREADS, THREAD_FACTORY);
private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool(); // Executors.newFixedThreadPool(SEARCH_THREADS, THREAD_FACTORY);
@Override
public Mono<LuceneSearchResult> collectMulti(Mono<Send<LLIndexSearchers>> indexSearchersMono,
@ -81,11 +81,12 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher {
AtomicReference<Thread> threadAtomicReference = new AtomicReference<>();
var disposable = EXECUTOR.submit(() -> {
try {
LLUtils.ensureBlocking();
threadAtomicReference.set(Thread.currentThread());
int shardIndexTemp = 0;
for (IndexSearcher shard : shards) {
try {
if (sink.isCancelled()) break;
final int shardIndex = shardIndexTemp;
var collector = withTimeout(new SimpleCollector() {
private LeafReaderContext leafReaderContext;
@ -116,12 +117,12 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher {
}
}, localQueryParams.timeout());
shard.search(localQueryParams.query(), collector);
sink.complete();
shardIndexTemp++;
}
} catch (Throwable e) {
sink.error(e);
}
shardIndexTemp++;
}
sink.complete();
});
sink.onRequest(lc -> LockSupport.unpark(threadAtomicReference.get()));
sink.onDispose(() -> disposable.cancel(false));