diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 03ed367..4ab9419 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -59,6 +59,7 @@ import org.jetbrains.annotations.Nullable; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.publisher.GroupedFlux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; @@ -563,62 +564,74 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { requestsAvailable.release(); }); - luceneQueryScheduler.schedule(() -> { - try { - requestsAvailable.acquire(); - if (!cancelled.get()) { - if (doDistributedPre) { - allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery); - sink.next(new LLTotalHitsCount(0L)); - } else { - int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit); - streamSearcher.search(indexSearcher, - luceneQuery, - boundedLimit, - luceneSort, - luceneScoreMode, - minCompetitiveScore, - keyFieldName, - keyScore -> { - try { - if (requests.get() <= 0 && !cancelled.get()) { - requestsAvailable.acquire(); - } - if (cancelled.get()) { + try { + luceneQueryScheduler.schedule(() -> { + try { + if (!cancelled.get()) { + if (doDistributedPre) { + allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery); + sink.next(new LLTotalHitsCount(0L)); + } else { + int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit); + streamSearcher.search(indexSearcher, + luceneQuery, + boundedLimit, + luceneSort, + luceneScoreMode, + minCompetitiveScore, + keyFieldName, + keyScore -> { + try { + if (cancelled.get()) { + return HandleResult.HALT; + } + while (requests.get() <= 0) { + requestsAvailable.acquire(); + if (cancelled.get()) { + return HandleResult.HALT; + } + } + requests.decrementAndGet(); + sink.next(fixKeyScore(keyScore, scoreDivisor)); + return HandleResult.CONTINUE; + } catch (Exception ex) { + sink.error(ex); + cancelled.set(true); + requestsAvailable.release(); return HandleResult.HALT; } - requests.updateAndGet(n -> n > 0 ? n - 1 : 0); - sink.next(fixKeyScore(keyScore, scoreDivisor)); - return HandleResult.CONTINUE; - } catch (Exception ex) { - sink.error(ex); - cancelled.set(true); - requestsAvailable.release(); - return HandleResult.HALT; - } - }, - totalHitsCount -> { - try { - if (requests.get() <= 0 && !cancelled.get()) { - requestsAvailable.acquire(); + }, + totalHitsCount -> { + try { + if (cancelled.get()) { + return; + } + while (requests.get() <= 0) { + requestsAvailable.acquire(); + if (cancelled.get()) { + return; + } + } + requests.decrementAndGet(); + sink.next(new LLTotalHitsCount(totalHitsCount)); + } catch (Exception ex) { + sink.error(ex); + cancelled.set(true); + requestsAvailable.release(); } - requests.updateAndGet(n -> n > 0 ? n - 1 : 0); - sink.next(new LLTotalHitsCount(totalHitsCount)); - } catch (Exception ex) { - sink.error(ex); - cancelled.set(true); - requestsAvailable.release(); } - } - ); + ); + } + sink.complete(); } - sink.complete(); + } catch (Exception ex) { + sink.error(ex); } - } catch (Exception ex) { - sink.error(ex); - } - }); - }).subscribeOn(Schedulers.boundedElastic())))); + }); + } catch (Exception ex) { + sink.error(ex); + } + }, OverflowStrategy.ERROR).subscribeOn(Schedulers.boundedElastic())))); } @Override