diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java index 5102309..7a8d188 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java @@ -33,7 +33,7 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher { private static final int SEARCH_THREADS = Math.min(Math.max(8, Runtime.getRuntime().availableProcessors()), 128); private static final ThreadFactory THREAD_FACTORY = new ShortNamedThreadFactory("UnscoredStreamingSearcher"); - private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(SEARCH_THREADS, THREAD_FACTORY); + private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool(); // Executors.newFixedThreadPool(SEARCH_THREADS, THREAD_FACTORY); @Override public Mono collectMulti(Mono> indexSearchersMono, @@ -81,11 +81,12 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher { AtomicReference threadAtomicReference = new AtomicReference<>(); var disposable = EXECUTOR.submit(() -> { - LLUtils.ensureBlocking(); - threadAtomicReference.set(Thread.currentThread()); - int shardIndexTemp = 0; - for (IndexSearcher shard : shards) { - try { + try { + LLUtils.ensureBlocking(); + threadAtomicReference.set(Thread.currentThread()); + int shardIndexTemp = 0; + for (IndexSearcher shard : shards) { + if (sink.isCancelled()) break; final int shardIndex = shardIndexTemp; var collector = withTimeout(new SimpleCollector() { private LeafReaderContext leafReaderContext; @@ -116,12 +117,12 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher { } }, localQueryParams.timeout()); shard.search(localQueryParams.query(), collector); - sink.complete(); - } catch (Throwable e) { - sink.error(e); + shardIndexTemp++; } - shardIndexTemp++; + } catch (Throwable e) { + sink.error(e); } + sink.complete(); }); sink.onRequest(lc -> LockSupport.unpark(threadAtomicReference.get())); sink.onDispose(() -> disposable.cancel(false));