diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 25e968c..03ed367 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -451,18 +451,19 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { luceneQuery = mltQuery; } - return luceneSearch(doDistributedPre, - indexSearcher, - queryParams.getLimit(), - queryParams.getMinCompetitiveScore().getNullable(), - keyFieldName, - scoreDivisor, - luceneQuery, - QueryParser.toSort(queryParams.getSort()), - QueryParser.toScoreMode(queryParams.getScoreMode()) - ); + return luceneQuery; }) .subscribeOn(luceneQueryScheduler) + .map(luceneQuery -> luceneSearch(doDistributedPre, + indexSearcher, + queryParams.getLimit(), + queryParams.getMinCompetitiveScore().getNullable(), + keyFieldName, + scoreDivisor, + luceneQuery, + QueryParser.toSort(queryParams.getSort()), + QueryParser.toScoreMode(queryParams.getScoreMode()) + )) .materialize() .flatMap(signal -> { if (signal.isOnComplete() || signal.isOnError()) { @@ -507,7 +508,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { }) .subscribeOn(luceneQueryScheduler) .flatMap(tuple -> Mono - .fromCallable(() -> { + .fromSupplier(() -> { Query luceneQuery = tuple.getT1(); Sort luceneSort = tuple.getT2().orElse(null); ScoreMode luceneScoreMode = tuple.getT3(); @@ -522,7 +523,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { luceneSort, luceneScoreMode ); - }).subscribeOn(luceneQueryScheduler) + }) ) .materialize() .flatMap(signal -> { @@ -545,7 +546,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { Query luceneQuery, Sort luceneSort, ScoreMode luceneScoreMode) { - var searchFlux = Flux.create(sink -> { + return new LLSearchResult(Flux.just(Flux.defer(() -> Flux.create(sink -> { AtomicBoolean cancelled = new AtomicBoolean(); AtomicLong requests = new AtomicLong(); Semaphore requestsAvailable = new Semaphore(0); @@ -553,74 +554,71 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { cancelled.set(true); requestsAvailable.release(); }); + sink.onCancel(() -> { + cancelled.set(true); + requestsAvailable.release(); + }); sink.onRequest(delta -> { requests.addAndGet(delta); requestsAvailable.release(); }); - try { - //noinspection BlockingMethodInNonBlockingContext - requestsAvailable.acquire(); - requestsAvailable.release(); - if (!cancelled.get()) { - if (doDistributedPre) { - //noinspection BlockingMethodInNonBlockingContext - allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery); - sink.next(new LLTotalHitsCount(0L)); - } else { - int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit); - //noinspection BlockingMethodInNonBlockingContext - streamSearcher.search(indexSearcher, - luceneQuery, - boundedLimit, - luceneSort, - luceneScoreMode, - minCompetitiveScore, - keyFieldName, - keyScore -> { - try { - while (requests.get() <= 0 && !cancelled.get()) { - requestsAvailable.acquire(); - } - if (!cancelled.get()) { - requests.decrementAndGet(); + luceneQueryScheduler.schedule(() -> { + try { + requestsAvailable.acquire(); + if (!cancelled.get()) { + if (doDistributedPre) { + allowOnlyQueryParsingCollectorStreamSearcher.search(indexSearcher, luceneQuery); + sink.next(new LLTotalHitsCount(0L)); + } else { + int boundedLimit = Math.max(0, limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit); + streamSearcher.search(indexSearcher, + luceneQuery, + boundedLimit, + luceneSort, + luceneScoreMode, + minCompetitiveScore, + keyFieldName, + keyScore -> { + try { + if (requests.get() <= 0 && !cancelled.get()) { + requestsAvailable.acquire(); + } + if (cancelled.get()) { + return HandleResult.HALT; + } + requests.updateAndGet(n -> n > 0 ? n - 1 : 0); sink.next(fixKeyScore(keyScore, scoreDivisor)); return HandleResult.CONTINUE; - } else { + } catch (Exception ex) { + sink.error(ex); + cancelled.set(true); + requestsAvailable.release(); return HandleResult.HALT; } - } catch (Exception ex) { - sink.error(ex); - cancelled.set(true); - return HandleResult.HALT; - } - }, - totalHitsCount -> { - try { - while (requests.get() <= 0 && !cancelled.get()) { - requestsAvailable.acquire(); - } - if (!cancelled.get()) { - requests.decrementAndGet(); + }, + totalHitsCount -> { + try { + if (requests.get() <= 0 && !cancelled.get()) { + requestsAvailable.acquire(); + } + requests.updateAndGet(n -> n > 0 ? n - 1 : 0); sink.next(new LLTotalHitsCount(totalHitsCount)); + } catch (Exception ex) { + sink.error(ex); + cancelled.set(true); + requestsAvailable.release(); } - } catch (Exception ex) { - sink.error(ex); - cancelled.set(true); } - } - ); - } - if (!cancelled.get()) { + ); + } sink.complete(); } + } catch (Exception ex) { + sink.error(ex); } - } catch (Exception ex) { - sink.error(ex); - } - }).subscribeOn(luceneQueryScheduler); - - return new LLSearchResult(Flux.just(searchFlux)); + }); + }).subscribeOn(Schedulers.boundedElastic())))); } @Override diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index 140f4ea..d0b4036 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -238,13 +238,14 @@ public class LuceneUtils { .map(LuceneSignal::getTotalHitsCount) .reduce(Long::sum) .map(sum -> LuceneSignal.totalHitsCount(sum)); - return sortedValues.mergeWith(sortedTotalSize); + return Flux.merge(sortedValues, sortedTotalSize); } public static Flux mergeSignalStreamRaw(Flux> mappedKeys, MultiSort mappedSort, Long limit) { Flux> sharedMappedSignals = mappedKeys.publish().refCount(2); + Flux sortedValues = LuceneUtils .mergeStream(sharedMappedSignals.map(sub -> sub.filter(LLSignal::isValue)), mappedSort, limit); //noinspection Convert2MethodRef @@ -254,6 +255,7 @@ public class LuceneUtils { .map(LLSignal::getTotalHitsCount) .reduce(Long::sum) .map(sum -> new LLTotalHitsCount(sum)); - return sortedValues.mergeWith(sortedTotalSize); + + return Flux.merge(sortedValues, sortedTotalSize); } }