diff --git a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java index 22eabd8..71d4dc2 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java @@ -34,8 +34,7 @@ import reactor.core.scheduler.Schedulers; public class CachedIndexSearcherManager implements IndexSearcherManager { private static final Logger logger = LoggerFactory.getLogger(CachedIndexSearcherManager.class); - private final Executor SEARCH_EXECUTOR = Executors - .newCachedThreadPool(new ShortNamedThreadFactory("lucene-search")); + private final Executor SEARCH_EXECUTOR = command -> Schedulers.boundedElastic().schedule(command); private final SearcherFactory SEARCHER_FACTORY = new ExecutorSearcherFactory(SEARCH_EXECUTOR); private final SnapshotsManager snapshotsManager; @@ -128,7 +127,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager { logger.info("Closed active searchers"); cachedSnapshotSearchers.invalidateAll(); cachedSnapshotSearchers.cleanUp(); - })).cache(); + }).subscribeOn(Schedulers.boundedElastic())).cache(); } private Mono> generateCachedSearcher(@Nullable LLSnapshot snapshot) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 29a3679..e8128ca 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -277,35 +277,29 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } private Mono runSafe(Callable callable) { - return Mono.create(sink -> { - var future = SAFE_EXECUTOR.submit(() -> { - try { - var result = callable.call(); - if (result != null) { - sink.success(result); - } else { - sink.success(); - } - } catch (Throwable e) { - sink.error(e); + return Mono.create(sink -> Schedulers.boundedElastic().schedule(() -> { + try { + var result = callable.call(); + if (result != null) { + sink.success(result); + } else { + sink.success(); } - }); - sink.onDispose(() -> future.cancel(false)); - }); + } catch (Throwable e) { + sink.error(e); + } + })); } private Mono runSafe(IORunnable runnable) { - return Mono.create(sink -> { - var future = SAFE_EXECUTOR.submit(() -> { - try { - runnable.run(); - sink.success(); - } catch (Throwable e) { - sink.error(e); - } - }); - sink.onDispose(() -> future.cancel(false)); - }); + return Mono.create(sink -> Schedulers.boundedElastic().schedule(() -> { + try { + runnable.run(); + sink.success(); + } catch (Throwable e) { + sink.error(e); + } + })); } @Override diff --git a/src/main/java/it/cavallium/dbengine/lucene/collector/ReactiveCollectorMultiManager.java b/src/main/java/it/cavallium/dbengine/lucene/collector/ReactiveCollectorMultiManager.java index f389dad..5413418 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/collector/ReactiveCollectorMultiManager.java +++ b/src/main/java/it/cavallium/dbengine/lucene/collector/ReactiveCollectorMultiManager.java @@ -15,15 +15,13 @@ import reactor.core.publisher.Sinks.Many; public class ReactiveCollectorMultiManager implements CollectorMultiManager { - private final FluxSink scoreDocsSink; - private final LongSemaphore requested; - public ReactiveCollectorMultiManager(FluxSink scoreDocsSink, LongSemaphore requested) { - this.scoreDocsSink = scoreDocsSink; - this.requested = requested; + public ReactiveCollectorMultiManager() { } - public CollectorManager get(int shardIndex) { + public CollectorManager get(LongSemaphore requested, + FluxSink scoreDocsSink, + int shardIndex) { return new CollectorManager<>() { @Override diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java index 85b6d97..bdf18d6 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/UnsortedUnscoredStreamingMultiSearcher.java @@ -1,5 +1,8 @@ package it.cavallium.dbengine.lucene.searcher; +import static java.lang.Math.toIntExact; +import static java.util.Objects.requireNonNull; + import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLKeyScore; @@ -11,8 +14,10 @@ import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInpu import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -28,12 +33,10 @@ import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; +import reactor.util.function.Tuples; public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher { - private static final ExecutorService SCHEDULER = Executors.newCachedThreadPool(new ShortNamedThreadFactory( - "UnscoredStreamingSearcher")); - @Override public Mono collectMulti(Mono> indexSearchersMono, LocalQueryParams queryParams, @@ -61,47 +64,32 @@ public class UnsortedUnscoredStreamingMultiSearcher implements MultiSearcher { } var shards = indexSearchers.shards(); - Flux scoreDocsFlux = Flux.create(scoreDocsSink -> { - var requested = new LongSemaphore(0); - var cmm = new ReactiveCollectorMultiManager(scoreDocsSink, requested); + var cmm = new ReactiveCollectorMultiManager(); - scoreDocsSink.onRequest(requested::release); + Flux scoreDocsFlux = Flux.fromIterable(shards) + .index() + .flatMap(tuple -> Flux.create(scoreDocsSink -> { + LLUtils.ensureBlocking(); + var index = toIntExact(requireNonNull(tuple.getT1())); + var shard = tuple.getT2(); + var requested = new LongSemaphore(0); + var collectorManager = cmm.get(requested, scoreDocsSink, index); - int mutableShardIndex = 0; - CompletableFuture[] futures = new CompletableFuture[shards.size()]; - for (IndexSearcher shard : shards) { - int shardIndex = mutableShardIndex++; - assert queryParams.computePreciseHitsCount() == cmm.scoreMode().isExhaustive(); + assert queryParams.computePreciseHitsCount() == cmm.scoreMode().isExhaustive(); + + scoreDocsSink.onRequest(requested::release); - var future = CompletableFuture.runAsync(() -> { try { - LLUtils.ensureBlocking(); - var collectorManager = cmm.get(shardIndex); shard.search(localQueryParams.query(), collectorManager.newCollector()); + scoreDocsSink.complete(); } catch (IOException e) { - throw new CompletionException(e); + scoreDocsSink.error(e); } - }, SCHEDULER); - - futures[shardIndex] = future; - } - var combinedFuture = CompletableFuture.allOf(futures).whenCompleteAsync((result, ex) -> { - if (ex != null) { - scoreDocsSink.error(ex); - } else { - scoreDocsSink.complete(); - } - }); - scoreDocsSink.onDispose(() -> { - for (CompletableFuture future : futures) { - future.cancel(true); - } - combinedFuture.cancel(true); - }); - }, OverflowStrategy.BUFFER).subscribeOn(Schedulers.boundedElastic()).publishOn(Schedulers.boundedElastic()); + }, OverflowStrategy.BUFFER).subscribeOn(Schedulers.boundedElastic())); - Flux resultsFlux = LuceneUtils.convertHits(scoreDocsFlux, shards, keyFieldName, false); + Flux resultsFlux = LuceneUtils + .convertHits(scoreDocsFlux.publishOn(Schedulers.boundedElastic()), shards, keyFieldName, false); var totalHitsCount = new TotalHitsCount(0, false); Flux mergedFluxes = resultsFlux