diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 011773e..8461dab 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -19,7 +19,6 @@ import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity; import it.cavallium.dbengine.lucene.searcher.AdaptiveStreamSearcher; import it.cavallium.dbengine.lucene.searcher.AllowOnlyQueryParsingCollectorStreamSearcher; import it.cavallium.dbengine.lucene.searcher.LuceneStreamSearcher; -import it.cavallium.dbengine.lucene.searcher.PagedStreamSearcher; import it.cavallium.dbengine.lucene.serializer.QueryParser; import java.io.IOException; import java.nio.file.Path; @@ -50,6 +49,8 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.warp.commonutils.log.Logger; +import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.GroupedFlux; import reactor.core.publisher.Mono; @@ -66,6 +67,7 @@ import reactor.util.function.Tuples; public class LLLocalLuceneIndex implements LLLuceneIndex { + protected static final Logger logger = LoggerFactory.getLogger(LLLocalLuceneIndex.class); private static final LuceneStreamSearcher streamSearcher = new AdaptiveStreamSearcher(); private static final AllowOnlyQueryParsingCollectorStreamSearcher allowOnlyQueryParsingCollectorStreamSearcher = new AllowOnlyQueryParsingCollectorStreamSearcher(); /** @@ -508,8 +510,9 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { Many topKeysSink = Sinks .many() .unicast() - .onBackpressureBuffer(new ArrayBlockingQueue<>(PagedStreamSearcher.MAX_ITEMS_PER_PAGE)); - Schedulers.boundedElastic().schedule(() -> { + .onBackpressureBuffer(); + + var searchFlux = Flux.push(sink -> { try { if (doDistributedPre) { allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery); @@ -525,26 +528,50 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { keyFieldName, keyScore -> { EmitResult result = topKeysSink.tryEmitNext(fixKeyScore(keyScore, scoreDivisor)); - if (result.isFailure() && result != EmitResult.FAIL_CANCELLED && result != EmitResult.FAIL_ZERO_SUBSCRIBER) { - throw new EmissionException(result); + if (result.isFailure()) { + if (result == EmitResult.FAIL_CANCELLED) { + logger.debug("Fail to emit next value: cancelled"); + } else if (result == EmitResult.FAIL_TERMINATED) { + logger.debug("Fail to emit next value: terminated"); + } else if (result == EmitResult.FAIL_ZERO_SUBSCRIBER) { + logger.error("Fail to emit next value: zero subscriber. You must subscribe to results before total hits if you specified a limit > 0!"); + sink.error(new EmissionException(result)); + throw new EmissionException(result); + } else { + throw new EmissionException(result); + } } }, totalHitsCount -> { EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount); - if (result.isFailure() && result != EmitResult.FAIL_CANCELLED && result != EmitResult.FAIL_ZERO_SUBSCRIBER) { - throw new EmissionException(result); + if (result.isFailure()) { + if (result == EmitResult.FAIL_CANCELLED) { + logger.debug("Fail to emit total hits count: cancelled"); + } else if (result == EmitResult.FAIL_TERMINATED) { + logger.debug("Fail to emit total hits count: terminated"); + } else if (result == EmitResult.FAIL_ZERO_SUBSCRIBER) { + logger.debug("Fail to emit total hits count: zero subscriber"); + } else { + sink.error(new EmissionException(result)); + throw new EmissionException(result); + } } } ); } topKeysSink.tryEmitComplete(); + sink.complete(); } catch (IOException e) { topKeysSink.tryEmitError(e); totalHitsCountSink.tryEmitError(e); + sink.error(e); } - }); + }).share(); - return new LLSearchResult(totalHitsCountSink.asMono(), Flux.just(topKeysSink.asFlux())); + return new LLSearchResult( + Mono.firstWithValue(searchFlux.then(Mono.empty()), totalHitsCountSink.asMono()), + Flux.>merge(searchFlux.then(Mono.empty()), Flux.just(topKeysSink.asFlux())) + ); }).subscribeOn(luceneBlockingScheduler) ) .materialize()