package it.cavallium.dbengine.lucene.searcher; import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler; import static it.cavallium.dbengine.lucene.LuceneUtils.luceneScheduler; import io.netty5.util.Send; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearchers; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.collector.Buckets; import it.cavallium.dbengine.lucene.collector.DecimalBucketMultiCollectorManager; import java.util.List; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; public class DecimalBucketMultiSearcher { protected static final Logger logger = LogManager.getLogger(DecimalBucketMultiSearcher.class); public Mono collectMulti(Mono indexSearchersMono, BucketParams bucketParams, @NotNull List queries, @Nullable Query normalizationQuery) { return Mono.usingWhen(indexSearchersMono, indexSearchers -> this // Search results .search(indexSearchers.shards(), bucketParams, queries, normalizationQuery) // Ensure that one result is always returned .single(), indexSearchers -> Mono.fromCallable(() -> { indexSearchers.close(); return null; }).transform(LuceneUtils::scheduleLucene)); } private Mono search(Iterable indexSearchers, BucketParams bucketParams, @NotNull List queries, @Nullable Query normalizationQuery) { return Mono.defer(() -> { var cmm = new DecimalBucketMultiCollectorManager(bucketParams.min(), bucketParams.max(), bucketParams.buckets(), bucketParams.bucketFieldName(), bucketParams.valueSource(), queries, normalizationQuery, bucketParams.collectionRate(), bucketParams.sampleSize() ); return Flux .fromIterable(indexSearchers) .flatMap(shard -> Mono.fromCallable(() -> { LLUtils.ensureBlocking(); return cmm.search(shard); }).subscribeOn(luceneScheduler())) .collectList() .flatMap(results -> Mono.fromSupplier(() -> cmm.reduce(results)).subscribeOn(luceneScheduler())) .publishOn(Schedulers.parallel()); }); } }